diff --git a/src/sagemaker/fw_utils.py b/src/sagemaker/fw_utils.py index a90e5608c3..7fddc40757 100644 --- a/src/sagemaker/fw_utils.py +++ b/src/sagemaker/fw_utils.py @@ -219,9 +219,11 @@ def framework_name_from_image(image_name): else: # extract framework, python version and image tag # We must support both the legacy and current image name format. - name_pattern = \ - re.compile('^sagemaker(?:-rl)?-(tensorflow|mxnet|chainer|pytorch|scikit-learn):(.*)-(.*?)-(py2|py3)$') - legacy_name_pattern = re.compile('^sagemaker-(tensorflow|mxnet)-(py2|py3)-(cpu|gpu):(.*)$') + name_pattern = re.compile( + r'^sagemaker(?:-rl)?-(tensorflow|mxnet|chainer|pytorch|scikit-learn):(.*)-(.*?)-(py2|py3)$') + legacy_name_pattern = re.compile( + r'^sagemaker-(tensorflow|mxnet)-(py2|py3)-(cpu|gpu):(.*)$') + name_match = name_pattern.match(sagemaker_match.group(8)) legacy_match = legacy_name_pattern.match(sagemaker_match.group(8)) diff --git a/src/sagemaker/tensorflow/serving.py b/src/sagemaker/tensorflow/serving.py index c1b41d76a4..dd923ea0ef 100644 --- a/src/sagemaker/tensorflow/serving.py +++ b/src/sagemaker/tensorflow/serving.py @@ -13,7 +13,6 @@ from __future__ import absolute_import import logging - import sagemaker from sagemaker.content_types import CONTENT_TYPE_JSON from sagemaker.fw_utils import create_image_uri @@ -144,7 +143,6 @@ def _get_image_uri(self, instance_type): if self.image: return self.image - # reuse standard image uri function, then strip unwanted python component region_name = self.sagemaker_session.boto_region_name return create_image_uri(region_name, Model.FRAMEWORK_NAME, instance_type, self._framework_version) diff --git a/src/sagemaker/utils.py b/src/sagemaker/utils.py index fa1a4b216e..750b3c07a2 100644 --- a/src/sagemaker/utils.py +++ b/src/sagemaker/utils.py @@ -14,6 +14,7 @@ import errno import os +import random import re import sys import tarfile @@ -64,6 +65,14 @@ def name_from_base(base, max_length=63, short=False): return '{}-{}'.format(trimmed_base, timestamp) +def unique_name_from_base(base, max_length=63): + unique = '%04x' % random.randrange(16**4) # 4-digit hex + ts = str(int(time.time())) + available_length = max_length - 2 - len(ts) - len(unique) + trimmed = base[:available_length] + return '{}-{}-{}'.format(trimmed, ts, unique) + + def airflow_name_from_base(base, max_length=63, short=False): """Append airflow execution_date macro (https://airflow.apache.org/code.html?#macros) to the provided string. The macro will beevaluated in Airflow operator runtime. diff --git a/tests/integ/local_mode_utils.py b/tests/integ/local_mode_utils.py new file mode 100644 index 0000000000..31bf1ed061 --- /dev/null +++ b/tests/integ/local_mode_utils.py @@ -0,0 +1,38 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import fcntl +import os +import time +from contextlib import contextmanager + +import tests.integ + +LOCK_PATH = os.path.join(tests.integ.DATA_DIR, 'local_mode_lock') + + +@contextmanager +def lock(): + # Since Local Mode uses the same port for serving, we need a lock in order + # to allow concurrent test execution. + local_mode_lock_fd = open(LOCK_PATH, 'w') + local_mode_lock = local_mode_lock_fd.fileno() + + fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) + + try: + yield + finally: + time.sleep(5) + fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index 88b6064d87..8995548c12 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -12,22 +12,20 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import fcntl import os -import time import boto3 import numpy import pytest +import tests.integ.local_mode_utils as local_mode_utils +from tests.integ import DATA_DIR, PYTHON_VERSION +from tests.integ.timeout import timeout from sagemaker.local import LocalSession, LocalSagemakerRuntimeClient, LocalSagemakerClient from sagemaker.mxnet import MXNet from sagemaker.tensorflow import TensorFlow -from tests.integ import DATA_DIR, PYTHON_VERSION -from tests.integ.timeout import timeout DATA_PATH = os.path.join(DATA_DIR, 'iris', 'data') -LOCK_PATH = os.path.join(DATA_DIR, 'local_mode_lock') DEFAULT_REGION = 'us-west-2' @@ -35,6 +33,7 @@ class LocalNoS3Session(LocalSession): """ This Session sets local_code: True regardless of any config file settings """ + def __init__(self): super(LocalSession, self).__init__() @@ -74,13 +73,12 @@ def _create_model(output_path): mx.fit({'train': train_input, 'test': test_input}) model = mx.create_model(1) return model + return _create_model @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") def test_tf_local_mode(tf_full_version, sagemaker_local_session): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() with timeout(minutes=5): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') @@ -101,32 +99,25 @@ def test_tf_local_mode(tf_full_version, sagemaker_local_session): print('job succeeded: {}'.format(estimator.latest_training_job.name)) endpoint_name = estimator.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - json_predictor = estimator.deploy(initial_instance_count=1, - instance_type='local', - endpoint_name=endpoint_name) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({'inputs': features}) - print('predict result: {}'.format(dict_result)) - list_result = json_predictor.predict(features) - print('predict result: {}'.format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + json_predictor = estimator.deploy(initial_instance_count=1, + instance_type='local', + endpoint_name=endpoint_name) + + features = [6.4, 3.2, 4.5, 1.5] + dict_result = json_predictor.predict({'inputs': features}) + print('predict result: {}'.format(dict_result)) + list_result = json_predictor.predict(features) + print('predict result: {}'.format(list_result)) + + assert dict_result == list_result + finally: + estimator.delete_endpoint() @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") def test_tf_distributed_local_mode(sagemaker_local_session): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() with timeout(minutes=5): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') @@ -146,32 +137,25 @@ def test_tf_distributed_local_mode(sagemaker_local_session): endpoint_name = estimator.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - json_predictor = estimator.deploy(initial_instance_count=1, - instance_type='local', - endpoint_name=endpoint_name) + with local_mode_utils.lock(): + try: + json_predictor = estimator.deploy(initial_instance_count=1, + instance_type='local', + endpoint_name=endpoint_name) - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({'inputs': features}) - print('predict result: {}'.format(dict_result)) - list_result = json_predictor.predict(features) - print('predict result: {}'.format(list_result)) + features = [6.4, 3.2, 4.5, 1.5] + dict_result = json_predictor.predict({'inputs': features}) + print('predict result: {}'.format(dict_result)) + list_result = json_predictor.predict(features) + print('predict result: {}'.format(list_result)) - assert dict_result == list_result - finally: - estimator.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + assert dict_result == list_result + finally: + estimator.delete_endpoint() @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") def test_tf_local_data(sagemaker_local_session): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() with timeout(minutes=5): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') @@ -190,33 +174,26 @@ def test_tf_local_data(sagemaker_local_session): print('job succeeded: {}'.format(estimator.latest_training_job.name)) endpoint_name = estimator.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - json_predictor = estimator.deploy(initial_instance_count=1, - instance_type='local', - endpoint_name=endpoint_name) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({'inputs': features}) - print('predict result: {}'.format(dict_result)) - list_result = json_predictor.predict(features) - print('predict result: {}'.format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + json_predictor = estimator.deploy(initial_instance_count=1, + instance_type='local', + endpoint_name=endpoint_name) + + features = [6.4, 3.2, 4.5, 1.5] + dict_result = json_predictor.predict({'inputs': features}) + print('predict result: {}'.format(dict_result)) + list_result = json_predictor.predict(features) + print('predict result: {}'.format(list_result)) + + assert dict_result == list_result + finally: + estimator.delete_endpoint() @pytest.mark.skipif(PYTHON_VERSION != 'py2', reason="TensorFlow image supports only python 2.") def test_tf_local_data_local_script(): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() with timeout(minutes=5): - script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') estimator = TensorFlow(entry_point=script_path, @@ -235,79 +212,56 @@ def test_tf_local_data_local_script(): print('job succeeded: {}'.format(estimator.latest_training_job.name)) endpoint_name = estimator.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - json_predictor = estimator.deploy(initial_instance_count=1, - instance_type='local', - endpoint_name=endpoint_name) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({'inputs': features}) - print('predict result: {}'.format(dict_result)) - list_result = json_predictor.predict(features) - print('predict result: {}'.format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + json_predictor = estimator.deploy(initial_instance_count=1, + instance_type='local', + endpoint_name=endpoint_name) + features = [6.4, 3.2, 4.5, 1.5] + dict_result = json_predictor.predict({'inputs': features}) + print('predict result: {}'.format(dict_result)) + list_result = json_predictor.predict(features) + print('predict result: {}'.format(list_result)) -def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model, mxnet_full_version): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() + assert dict_result == list_result + finally: + estimator.delete_endpoint() + +def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model, mxnet_full_version): path = 's3://%s' % sagemaker_local_session.default_bucket() s3_model = mxnet_model(path) s3_model.sagemaker_session = sagemaker_local_session predictor = None - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - predictor = s3_model.deploy(initial_instance_count=1, instance_type='local') - data = numpy.zeros(shape=(1, 1, 28, 28)) - predictor.predict(data) - finally: - if predictor: - predictor.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + predictor = s3_model.deploy(initial_instance_count=1, instance_type='local') + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + finally: + if predictor: + predictor.delete_endpoint() def test_local_mode_serving_from_local_model(tmpdir, sagemaker_local_session, mxnet_model): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() predictor = None - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - path = 'file://%s' % (str(tmpdir)) - model = mxnet_model(path) - model.sagemaker_session = sagemaker_local_session - predictor = model.deploy(initial_instance_count=1, instance_type='local') - data = numpy.zeros(shape=(1, 1, 28, 28)) - predictor.predict(data) - finally: - if predictor: - predictor.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + path = 'file://%s' % (str(tmpdir)) + model = mxnet_model(path) + model.sagemaker_session = sagemaker_local_session + predictor = model.deploy(initial_instance_count=1, instance_type='local') + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + finally: + if predictor: + predictor.delete_endpoint() def test_mxnet_local_mode(sagemaker_local_session, mxnet_full_version): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() - script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') data_path = os.path.join(DATA_DIR, 'mxnet_mnist') @@ -322,24 +276,17 @@ def test_mxnet_local_mode(sagemaker_local_session, mxnet_full_version): mx.fit({'train': train_input, 'test': test_input}) endpoint_name = mx.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - predictor = mx.deploy(1, 'local', endpoint_name=endpoint_name) - data = numpy.zeros(shape=(1, 1, 28, 28)) - predictor.predict(data) - finally: - mx.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + try: + predictor = mx.deploy(1, 'local', endpoint_name=endpoint_name) + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + finally: + mx.delete_endpoint() -def test_mxnet_local_data_local_script(): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() +def test_mxnet_local_data_local_script(): data_path = os.path.join(DATA_DIR, 'mxnet_mnist') script_path = os.path.join(data_path, 'mnist_framework_mode.py') @@ -352,23 +299,17 @@ def test_mxnet_local_data_local_script(): mx.fit({'train': train_input, 'test': test_input}) endpoint_name = mx.latest_training_job.name - try: - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. The serving test is really fast so it still - # makes sense to allow this behavior. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - predictor = mx.deploy(1, 'local', endpoint_name=endpoint_name) - data = numpy.zeros(shape=(1, 1, 28, 28)) - predictor.predict(data) - finally: - mx.delete_endpoint() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + + with local_mode_utils.lock(): + try: + predictor = mx.deploy(1, 'local', endpoint_name=endpoint_name) + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + finally: + mx.delete_endpoint() def test_local_transform_mxnet(sagemaker_local_session, tmpdir, mxnet_full_version): - local_mode_lock_fd = open(LOCK_PATH, 'w') - local_mode_lock = local_mode_lock_fd.fileno() data_path = os.path.join(DATA_DIR, 'mxnet_mnist') script_path = os.path.join(data_path, 'mnist.py') @@ -393,12 +334,8 @@ def test_local_transform_mxnet(sagemaker_local_session, tmpdir, mxnet_full_versi transformer = mx.transformer(1, 'local', assemble_with='Line', max_payload=1, strategy='SingleRecord', output_path=output_path) - # Since Local Mode uses the same port for serving, we need a lock in order - # to allow concurrent test execution. - fcntl.lockf(local_mode_lock, fcntl.LOCK_EX) - transformer.transform(transform_input, content_type='text/csv', split_type='Line') - transformer.wait() - time.sleep(5) - fcntl.lockf(local_mode_lock, fcntl.LOCK_UN) + with local_mode_utils.lock(): + transformer.transform(transform_input, content_type='text/csv', split_type='Line') + transformer.wait() assert os.path.exists(os.path.join(str(tmpdir), 'data.csv.out')) diff --git a/tests/integ/test_source_dirs.py b/tests/integ/test_source_dirs.py index 806a03742a..cb288bea3f 100644 --- a/tests/integ/test_source_dirs.py +++ b/tests/integ/test_source_dirs.py @@ -14,9 +14,11 @@ import os -from sagemaker.pytorch.estimator import PyTorch +import tests.integ.local_mode_utils as local_mode_utils from tests.integ import DATA_DIR, PYTHON_VERSION +from sagemaker.pytorch.estimator import PyTorch + def test_source_dirs(tmpdir, sagemaker_local_session): source_dir = os.path.join(DATA_DIR, 'pytorch_source_dirs') @@ -25,17 +27,17 @@ def test_source_dirs(tmpdir, sagemaker_local_session): with open(lib, 'w') as f: f.write('def question(to_anything): return 42') - estimator = PyTorch(entry_point='train.py', role='SageMakerRole', source_dir=source_dir, dependencies=[lib], - py_version=PYTHON_VERSION, train_instance_count=1, train_instance_type='local', + estimator = PyTorch(entry_point='train.py', role='SageMakerRole', source_dir=source_dir, + dependencies=[lib], + py_version=PYTHON_VERSION, train_instance_count=1, + train_instance_type='local', sagemaker_session=sagemaker_local_session) - try: - - estimator.fit() - - predictor = estimator.deploy(initial_instance_count=1, instance_type='local') - - predict_response = predictor.predict([7]) - - assert predict_response == [49] - finally: - estimator.delete_endpoint() + estimator.fit() + + with local_mode_utils.lock(): + try: + predictor = estimator.deploy(initial_instance_count=1, instance_type='local') + predict_response = predictor.predict([7]) + assert predict_response == [49] + finally: + estimator.delete_endpoint() diff --git a/tests/integ/test_tuner.py b/tests/integ/test_tuner.py index 761880f5d4..2fbfb33a45 100644 --- a/tests/integ/test_tuner.py +++ b/tests/integ/test_tuner.py @@ -19,9 +19,12 @@ import sys import time -from botocore.exceptions import ClientError import numpy as np import pytest +from botocore.exceptions import ClientError +from tests.integ import DATA_DIR, PYTHON_VERSION, TUNING_DEFAULT_TIMEOUT_MINUTES +from tests.integ.record_set import prepare_record_set_from_local_files +from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name from sagemaker import KMeans, LDA, RandomCutForest from sagemaker.amazon.amazon_estimator import registry @@ -32,19 +35,17 @@ from sagemaker.predictor import json_deserializer from sagemaker.pytorch import PyTorch from sagemaker.tensorflow import TensorFlow -from sagemaker.utils import name_from_base -from sagemaker.tuner import IntegerParameter, ContinuousParameter, CategoricalParameter, HyperparameterTuner, \ - WarmStartConfig, WarmStartTypes, create_transfer_learning_tuner, create_identical_dataset_and_algorithm_tuner -from tests.integ import DATA_DIR, PYTHON_VERSION, TUNING_DEFAULT_TIMEOUT_MINUTES -from tests.integ.record_set import prepare_record_set_from_local_files -from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name +from sagemaker.tuner import IntegerParameter, ContinuousParameter, CategoricalParameter, \ + HyperparameterTuner, \ + WarmStartConfig, WarmStartTypes, create_transfer_learning_tuner, \ + create_identical_dataset_and_algorithm_tuner +from sagemaker.utils import unique_name_from_base DATA_PATH = os.path.join(DATA_DIR, 'iris', 'data') @pytest.fixture(scope='module') def kmeans_train_set(sagemaker_session): - data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} # Load the data into memory as numpy arrays @@ -56,7 +57,6 @@ def kmeans_train_set(sagemaker_session): @pytest.fixture(scope='module') def kmeans_estimator(sagemaker_session): - kmeans = KMeans(role='SageMakerRole', train_instance_count=1, train_instance_type='ml.c4.xlarge', k=10, sagemaker_session=sagemaker_session, base_job_name='tk', @@ -81,10 +81,12 @@ def hyperparameter_ranges(): 'init_method': CategoricalParameter(['kmeans++', 'random'])} -def _tune_and_deploy(kmeans_estimator, kmeans_train_set, sagemaker_session, hyperparameter_ranges=None, job_name=None, +def _tune_and_deploy(kmeans_estimator, kmeans_train_set, sagemaker_session, + hyperparameter_ranges=None, job_name=None, warm_start_config=None): tuner = _tune(kmeans_estimator, kmeans_train_set, - hyperparameter_ranges=hyperparameter_ranges, warm_start_config=warm_start_config, job_name=job_name) + hyperparameter_ranges=hyperparameter_ranges, warm_start_config=warm_start_config, + job_name=job_name) _deploy(kmeans_train_set, sagemaker_session, tuner) @@ -107,10 +109,13 @@ def _tune(kmeans_estimator, kmeans_train_set, tuner=None, with timeout(minutes=TUNING_DEFAULT_TIMEOUT_MINUTES): if not tuner: - tuner = HyperparameterTuner(estimator=kmeans_estimator, objective_metric_name='test:msd', - hyperparameter_ranges=hyperparameter_ranges, objective_type='Minimize', + tuner = HyperparameterTuner(estimator=kmeans_estimator, + objective_metric_name='test:msd', + hyperparameter_ranges=hyperparameter_ranges, + objective_type='Minimize', max_jobs=max_jobs, - max_parallel_jobs=max_parallel_jobs, warm_start_config=warm_start_config) + max_parallel_jobs=max_parallel_jobs, + warm_start_config=warm_start_config) records = kmeans_estimator.record_set(kmeans_train_set[0][:100]) test_record_set = kmeans_estimator.record_set(kmeans_train_set[0][:100], channel='test') @@ -129,7 +134,8 @@ def test_tuning_kmeans(sagemaker_session, kmeans_train_set, kmeans_estimator, hyperparameter_ranges): - _tune_and_deploy(kmeans_estimator, kmeans_train_set, sagemaker_session, hyperparameter_ranges=hyperparameter_ranges) + _tune_and_deploy(kmeans_estimator, kmeans_train_set, sagemaker_session, + hyperparameter_ranges=hyperparameter_ranges) @pytest.mark.continuous_testing @@ -137,14 +143,15 @@ def test_tuning_kmeans_identical_dataset_algorithm_tuner_raw(sagemaker_session, kmeans_train_set, kmeans_estimator, hyperparameter_ranges): - parent_tuning_job_name = name_from_base("kmeans-identical", max_length=32, short=True) - child_tuning_job_name = name_from_base("c-kmeans-identical", max_length=32, short=True) + parent_tuning_job_name = unique_name_from_base("kmeans-identical", max_length=32) + child_tuning_job_name = unique_name_from_base("c-kmeans-identical", max_length=32) _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name, hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, max_jobs=1) child_tuner = _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, hyperparameter_ranges=hyperparameter_ranges, - warm_start_config=WarmStartConfig(warm_start_type=WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM, - parents=[parent_tuning_job_name]), max_parallel_jobs=1, + warm_start_config=WarmStartConfig( + warm_start_type=WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM, + parents=[parent_tuning_job_name]), max_parallel_jobs=1, max_jobs=1) child_warm_start_config_response = WarmStartConfig.from_job_desc( @@ -162,14 +169,15 @@ def test_tuning_kmeans_identical_dataset_algorithm_tuner(sagemaker_session, """Tests Identical dataset and algorithm use case with one parent and child job launched with .identical_dataset_and_algorithm_tuner() """ - parent_tuning_job_name = name_from_base("km-iden1-parent", max_length=32, short=True) - child_tuning_job_name = name_from_base("km-iden1-child", max_length=32, short=True) + parent_tuning_job_name = unique_name_from_base("km-iden1-parent", max_length=32) + child_tuning_job_name = unique_name_from_base("km-iden1-child", max_length=32) parent_tuner = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name, hyperparameter_ranges=hyperparameter_ranges) child_tuner = parent_tuner.identical_dataset_and_algorithm_tuner() - _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, max_parallel_jobs=1, + _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, + max_parallel_jobs=1, max_jobs=1) child_warm_start_config_response = WarmStartConfig.from_job_desc( @@ -188,16 +196,19 @@ def test_create_tuning_kmeans_identical_dataset_algorithm_tuner(sagemaker_sessio """Tests Identical dataset and algorithm use case with one parent and child job launched with .create_identical_dataset_and_algorithm_tuner() """ - parent_tuning_job_name = name_from_base("km-iden2-parent", max_length=32, short=True) - child_tuning_job_name = name_from_base("km-iden2-child", max_length=32, short=True) + parent_tuning_job_name = unique_name_from_base("km-iden2-parent", max_length=32) + child_tuning_job_name = unique_name_from_base("km-iden2-child", max_length=32) parent_tuner = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name, - hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, max_jobs=1) + hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, + max_jobs=1) - child_tuner = create_identical_dataset_and_algorithm_tuner(parent=parent_tuner.latest_tuning_job.name, - sagemaker_session=sagemaker_session) + child_tuner = create_identical_dataset_and_algorithm_tuner( + parent=parent_tuner.latest_tuning_job.name, + sagemaker_session=sagemaker_session) - _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, max_parallel_jobs=1, + _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, + max_parallel_jobs=1, max_jobs=1) child_warm_start_config_response = WarmStartConfig.from_job_desc( @@ -215,14 +226,16 @@ def test_transfer_learning_tuner(sagemaker_session, """Tests Transfer learning use case with one parent and child job launched with .transfer_learning_tuner() """ - parent_tuning_job_name = name_from_base("km-tran1-parent", max_length=32, short=True) - child_tuning_job_name = name_from_base("km-tran1-child", max_length=32, short=True) + parent_tuning_job_name = unique_name_from_base("km-tran1-parent", max_length=32) + child_tuning_job_name = unique_name_from_base("km-tran1-child", max_length=32) parent_tuner = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name, - hyperparameter_ranges=hyperparameter_ranges, max_jobs=1, max_parallel_jobs=1) + hyperparameter_ranges=hyperparameter_ranges, max_jobs=1, + max_parallel_jobs=1) child_tuner = parent_tuner.transfer_learning_tuner() - _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, max_parallel_jobs=1, + _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner, + max_parallel_jobs=1, max_jobs=1) child_warm_start_config_response = WarmStartConfig.from_job_desc( @@ -240,20 +253,24 @@ def test_create_transfer_learning_tuner(sagemaker_session, hyperparameter_ranges): """Tests Transfer learning use case with two parents and child job launched with create_transfer_learning_tuner() """ - parent_tuning_job_name_1 = name_from_base("km-tran2-parent1", max_length=32, short=True) - parent_tuning_job_name_2 = name_from_base("km-tran2-parent2", max_length=32, short=True) - child_tuning_job_name = name_from_base("km-tran2-child", max_length=32, short=True) + parent_tuning_job_name_1 = unique_name_from_base("km-tran2-parent1", max_length=32) + parent_tuning_job_name_2 = unique_name_from_base("km-tran2-parent2", max_length=32) + child_tuning_job_name = unique_name_from_base("km-tran2-child", max_length=32) parent_tuner_1 = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name_1, - hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, max_jobs=1) + hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, + max_jobs=1) parent_tuner_2 = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name_2, - hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, max_jobs=1) + hyperparameter_ranges=hyperparameter_ranges, max_parallel_jobs=1, + max_jobs=1) + + child_tuner = create_transfer_learning_tuner( + parent=parent_tuner_1.latest_tuning_job.name, + sagemaker_session=sagemaker_session, + estimator=kmeans_estimator, + additional_parents={parent_tuner_2.latest_tuning_job.name}) - child_tuner = create_transfer_learning_tuner(parent=parent_tuner_1.latest_tuning_job.name, - sagemaker_session=sagemaker_session, - estimator=kmeans_estimator, - additional_parents={parent_tuner_2.latest_tuning_job.name}) _tune(kmeans_estimator, kmeans_train_set, job_name=child_tuning_job_name, tuner=child_tuner) child_warm_start_config_response = WarmStartConfig.from_job_desc( @@ -271,11 +288,12 @@ def test_tuning_kmeans_identical_dataset_algorithm_tuner_from_non_terminal_paren hyperparameter_ranges): """Tests Identical dataset and algorithm use case with one non terminal parent and child job launched with .identical_dataset_and_algorithm_tuner() """ - parent_tuning_job_name = name_from_base("km-non-term", max_length=32, short=True) - child_tuning_job_name = name_from_base("km-non-term-child", max_length=32, short=True) + parent_tuning_job_name = unique_name_from_base("km-non-term", max_length=32) + child_tuning_job_name = unique_name_from_base("km-non-term-child", max_length=32) parent_tuner = _tune(kmeans_estimator, kmeans_train_set, job_name=parent_tuning_job_name, - hyperparameter_ranges=hyperparameter_ranges, wait_till_terminal=False, max_parallel_jobs=1, + hyperparameter_ranges=hyperparameter_ranges, wait_till_terminal=False, + max_parallel_jobs=1, max_jobs=1) child_tuner = parent_tuner.identical_dataset_and_algorithm_tuner() @@ -296,12 +314,14 @@ def test_tuning_lda(sagemaker_session): feature_num = int(all_records[0].features['values'].float32_tensor.shape[0]) lda = LDA(role='SageMakerRole', train_instance_type='ml.c4.xlarge', num_topics=10, - sagemaker_session=sagemaker_session, base_job_name='test-lda') + sagemaker_session=sagemaker_session) record_set = prepare_record_set_from_local_files(data_path, lda.data_location, - len(all_records), feature_num, sagemaker_session) + len(all_records), feature_num, + sagemaker_session) test_record_set = prepare_record_set_from_local_files(data_path, lda.data_location, - len(all_records), feature_num, sagemaker_session) + len(all_records), feature_num, + sagemaker_session) test_record_set.channel = 'test' # specify which hp you want to optimize over @@ -310,10 +330,12 @@ def test_tuning_lda(sagemaker_session): objective_metric_name = 'test:pwll' tuner = HyperparameterTuner(estimator=lda, objective_metric_name=objective_metric_name, - hyperparameter_ranges=hyperparameter_ranges, objective_type='Maximize', max_jobs=2, + hyperparameter_ranges=hyperparameter_ranges, + objective_type='Maximize', max_jobs=2, max_parallel_jobs=2) - tuner.fit([record_set, test_record_set], mini_batch_size=1) + tuning_job_name = unique_name_from_base('test-lda', max_length=32) + tuner.fit([record_set, test_record_set], mini_batch_size=1, job_name=tuning_job_name) print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name) @@ -336,9 +358,10 @@ def test_stop_tuning_job(sagemaker_session): feature_num = 14 train_input = np.random.rand(1000, feature_num) - rcf = RandomCutForest(role='SageMakerRole', train_instance_count=1, train_instance_type='ml.c4.xlarge', - num_trees=50, num_samples_per_tree=20, sagemaker_session=sagemaker_session, - base_job_name='test-randomcutforest') + rcf = RandomCutForest(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + num_trees=50, num_samples_per_tree=20, + sagemaker_session=sagemaker_session) records = rcf.record_set(train_input) records.distribution = 'FullyReplicated' @@ -351,10 +374,12 @@ def test_stop_tuning_job(sagemaker_session): objective_metric_name = 'test:f1' tuner = HyperparameterTuner(estimator=rcf, objective_metric_name=objective_metric_name, - hyperparameter_ranges=hyperparameter_ranges, objective_type='Maximize', max_jobs=2, + hyperparameter_ranges=hyperparameter_ranges, + objective_type='Maximize', max_jobs=2, max_parallel_jobs=2) - tuner.fit([records, test_records]) + tuning_job_name = unique_name_from_base('test-randomcutforest', max_length=32) + tuner.fit([records, test_records], tuning_job_name) time.sleep(15) @@ -364,7 +389,7 @@ def test_stop_tuning_job(sagemaker_session): tuner.stop_tuning_job() - desc = tuner.latest_tuning_job.sagemaker_session.sagemaker_client\ + desc = tuner.latest_tuning_job.sagemaker_session.sagemaker_client \ .describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=latest_tuning_job_name) assert desc['HyperParameterTuningJobStatus'] == 'Stopping' @@ -381,22 +406,25 @@ def test_tuning_mxnet(sagemaker_session): train_instance_count=1, train_instance_type='ml.m4.xlarge', framework_version='1.2.1', - sagemaker_session=sagemaker_session, - base_job_name='tune-mxnet') + sagemaker_session=sagemaker_session) hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.01, 0.2)} objective_metric_name = 'Validation-accuracy' - metric_definitions = [{'Name': 'Validation-accuracy', 'Regex': 'Validation-accuracy=([0-9\\.]+)'}] - tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, + metric_definitions = [ + {'Name': 'Validation-accuracy', 'Regex': 'Validation-accuracy=([0-9\\.]+)'}] + tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, + metric_definitions, max_jobs=4, max_parallel_jobs=2) train_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), key_prefix='integ-test-data/mxnet_mnist/train') test_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), key_prefix='integ-test-data/mxnet_mnist/test') - tuner.fit({'train': train_input, 'test': test_input}) - print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name) + tuning_job_name = unique_name_from_base('tune-mxnet', max_length=32) + tuner.fit({'train': train_input, 'test': test_input}, job_name=tuning_job_name) + + print('Started hyperparameter tuning job with name:' + tuning_job_name) time.sleep(15) tuner.wait() @@ -421,8 +449,7 @@ def test_tuning_tf(sagemaker_session): hyperparameters={'input_tensor_name': 'inputs'}, train_instance_count=1, train_instance_type='ml.c4.xlarge', - sagemaker_session=sagemaker_session, - base_job_name='tune-tf') + sagemaker_session=sagemaker_session) inputs = sagemaker_session.upload_data(path=DATA_PATH, key_prefix='integ-test-data/tf_iris') hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.05, 0.2)} @@ -430,12 +457,14 @@ def test_tuning_tf(sagemaker_session): objective_metric_name = 'loss' metric_definitions = [{'Name': 'loss', 'Regex': 'loss = ([0-9\\.]+)'}] - tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, + tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, + metric_definitions, objective_type='Minimize', max_jobs=2, max_parallel_jobs=2) - tuner.fit(inputs) + tuning_job_name = unique_name_from_base('tune-tf', max_length=32) + tuner.fit(inputs, job_name=tuning_job_name) - print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name) + print('Started hyperparameter tuning job with name:' + tuning_job_name) time.sleep(15) tuner.wait() @@ -476,14 +505,17 @@ def test_tuning_chainer(sagemaker_session): objective_metric_name = 'Validation-accuracy' metric_definitions = [ - {'Name': 'Validation-accuracy', 'Regex': r'\[J1\s+\d\.\d+\s+\d\.\d+\s+\d\.\d+\s+(\d\.\d+)'}] + {'Name': 'Validation-accuracy', + 'Regex': r'\[J1\s+\d\.\d+\s+\d\.\d+\s+\d\.\d+\s+(\d\.\d+)'}] - tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, + tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, + metric_definitions, max_jobs=2, max_parallel_jobs=2) - tuner.fit({'train': train_input, 'test': test_input}) + tuning_job_name = unique_name_from_base('chainer', max_length=32) + tuner.fit({'train': train_input, 'test': test_input}, job_name=tuning_job_name) - print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name) + print('Started hyperparameter tuning job with name:' + tuning_job_name) time.sleep(15) tuner.wait() @@ -517,24 +549,28 @@ def test_attach_tuning_pytorch(sagemaker_session): with timeout(minutes=TUNING_DEFAULT_TIMEOUT_MINUTES): objective_metric_name = 'evaluation-accuracy' - metric_definitions = [{'Name': 'evaluation-accuracy', 'Regex': r'Overall test accuracy: (\d+)'}] + metric_definitions = [ + {'Name': 'evaluation-accuracy', 'Regex': r'Overall test accuracy: (\d+)'}] hyperparameter_ranges = {'batch-size': IntegerParameter(50, 100)} - tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, + tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, + metric_definitions, max_jobs=2, max_parallel_jobs=2) - training_data = estimator.sagemaker_session.upload_data(path=os.path.join(mnist_dir, 'training'), - key_prefix='integ-test-data/pytorch_mnist/training') - tuner.fit({'training': training_data}) + training_data = estimator.sagemaker_session.upload_data( + path=os.path.join(mnist_dir, 'training'), + key_prefix='integ-test-data/pytorch_mnist/training') - tuning_job_name = tuner.latest_tuning_job.name + tuning_job_name = unique_name_from_base('pytorch', max_length=32) + tuner.fit({'training': training_data}, job_name=tuning_job_name) print('Started hyperparameter tuning job with name:' + tuning_job_name) time.sleep(15) tuner.wait() - attached_tuner = HyperparameterTuner.attach(tuning_job_name, sagemaker_session=sagemaker_session) + attached_tuner = HyperparameterTuner.attach(tuning_job_name, + sagemaker_session=sagemaker_session) best_training_job = tuner.best_training_job() with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session): predictor = attached_tuner.deploy(1, 'ml.c4.xlarge') @@ -577,7 +613,7 @@ def test_tuning_byo_estimator(sagemaker_session): estimator = Estimator(image_name=image_name, role='SageMakerRole', train_instance_count=1, train_instance_type='ml.c4.xlarge', - sagemaker_session=sagemaker_session, base_job_name='test-byo') + sagemaker_session=sagemaker_session) estimator.set_hyperparameters(num_factors=10, feature_dim=784, @@ -586,12 +622,14 @@ def test_tuning_byo_estimator(sagemaker_session): hyperparameter_ranges = {'mini_batch_size': IntegerParameter(100, 200)} - tuner = HyperparameterTuner(estimator=estimator, base_tuning_job_name='byo', + tuner = HyperparameterTuner(estimator=estimator, objective_metric_name='test:binary_classification_accuracy', hyperparameter_ranges=hyperparameter_ranges, max_jobs=2, max_parallel_jobs=2) - tuner.fit({'train': s3_train_data, 'test': s3_train_data}, include_cls_metadata=False) + tuner.fit({'train': s3_train_data, 'test': s3_train_data}, + include_cls_metadata=False, + job_name=unique_name_from_base('byo', 32)) print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 0a1521fb54..265090c870 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -16,6 +16,7 @@ from datetime import datetime import os +import re import time import pytest @@ -24,7 +25,7 @@ import sagemaker from sagemaker.utils import get_config_value, name_from_base,\ to_str, DeferredError, extract_name_from_job_arn, secondary_training_status_changed,\ - secondary_training_status_message + secondary_training_status_message, unique_name_from_base NAME = 'base_name' @@ -78,6 +79,15 @@ def test_name_from_base_short(sagemaker_short_timestamp): assert sagemaker_short_timestamp.called_once +def test_unique_name_from_base(): + assert re.match(r'base-\d{10}-[a-f0-9]{4}', unique_name_from_base('base')) + + +def test_unique_name_from_base_truncated(): + assert re.match(r'real-\d{10}-[a-f0-9]{4}', + unique_name_from_base('really-long-name', max_length=20)) + + def test_to_str_with_native_string(): value = 'some string' assert to_str(value) == value diff --git a/tox.ini b/tox.ini index 0f1963faaa..6402496c88 100644 --- a/tox.ini +++ b/tox.ini @@ -71,4 +71,4 @@ basepython = python3 deps = pylint==2.1.1 commands = - python -m pylint --rcfile=.pylintrc src/sagemaker + python -m pylint --rcfile=.pylintrc -j 0 src/sagemaker