diff --git a/.gitignore b/.gitignore index b7f871ce2c..35800d01fb 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ doc/_templates venv/ *~ .pytest_cache/ +*.swp diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6a64ecffe7..02b0fae5fe 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,12 +2,15 @@ CHANGELOG ========= -1.3.dev1 -======== +1.4.0 +===== * bug-fix: Remove __all__ and add noqa in __init__ * bug-fix: Estimators: Change max_iterations hyperparameter key for KMeans * bug-fix: Estimators: Remove unused argument job_details for ``EstimatorBase.attach()`` +* bug-fix: Local Mode: Show logs in Jupyter notebooks +* feature: HyperparameterTuner: Add support for hyperparameter tuning jobs +* feature: Analytics: Add functions for metrics in Training and Hyperparameter Tuning jobs * feature: Estimators: add support for tagging training jobs 1.3.0 diff --git a/README.rst b/README.rst index 1661042685..12059beab1 100644 --- a/README.rst +++ b/README.rst @@ -48,7 +48,7 @@ You can install from source by cloning this repository and issuing a pip install git clone https://github.com/aws/sagemaker-python-sdk.git python setup.py sdist - pip install dist/sagemaker-1.3.0.tar.gz + pip install dist/sagemaker-1.4.0.tar.gz Supported Python versions ~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/setup.py b/setup.py index abf5823e70..2d7143793c 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read(fname): setup(name="sagemaker", - version="1.3.0", + version="1.4.0", description="Open source library for training and deploying models on Amazon SageMaker.", packages=find_packages('src'), package_dir={'': 'src'}, @@ -49,7 +49,7 @@ def read(fname): extras_require={ 'test': ['tox', 'flake8', 'pytest', 'pytest-cov', 'pytest-xdist', - 'mock', 'tensorflow>=1.3.0', 'contextlib2', 'awslogs']}, + 'mock', 'tensorflow>=1.3.0', 'contextlib2', 'awslogs', 'pandas']}, entry_points={ 'console_scripts': ['sagemaker=sagemaker.cli.main:main'], diff --git a/src/sagemaker/__init__.py b/src/sagemaker/__init__.py index 3c4f80d6b6..5d663d84b6 100644 --- a/src/sagemaker/__init__.py +++ b/src/sagemaker/__init__.py @@ -23,6 +23,7 @@ from sagemaker.amazon.randomcutforest import (RandomCutForest, RandomCutForestModel, # noqa: F401 RandomCutForestPredictor) +from sagemaker.analytics import TrainingJobAnalytics, HyperparameterTuningJobAnalytics # noqa: F401 from sagemaker.local.local_session import LocalSession # noqa: F401 from sagemaker.model import Model # noqa: F401 diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index ec72a0b881..6fc94da2f6 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -19,7 +19,7 @@ from sagemaker.amazon import validation from sagemaker.amazon.hyperparameter import Hyperparameter as hp # noqa from sagemaker.amazon.common import write_numpy_to_dense_tensor -from sagemaker.estimator import EstimatorBase +from sagemaker.estimator import EstimatorBase, _TrainingJob from sagemaker.session import s3_input from sagemaker.utils import sagemaker_timestamp @@ -92,11 +92,38 @@ def _prepare_init_params_from_job_description(cls, job_details): del init_params['image'] return init_params - def fit(self, records, mini_batch_size=None, **kwargs): + def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): + """Set hyperparameters needed for training. + + Args: + * records (:class:`~RecordSet`): The records to train this ``Estimator`` on. + * mini_batch_size (int or None): The size of each mini-batch to use when training. If ``None``, a + default value will be used. + * job_name (str): Name of the training job to be created. If not specified, one is generated, + using the base name given to the constructor if applicable. + """ + super(AmazonAlgorithmEstimatorBase, self)._prepare_for_training(job_name=job_name) + + feature_dim = None + + if isinstance(records, list): + for record in records: + if record.channel == 'train': + feature_dim = record.feature_dim + break + if feature_dim is None: + raise ValueError('Must provide train channel.') + else: + feature_dim = records.feature_dim + + self.feature_dim = feature_dim + self.mini_batch_size = mini_batch_size + + def fit(self, records, mini_batch_size=None, wait=True, logs=True, job_name=None): """Fit this Estimator on serialized Record objects, stored in S3. ``records`` should be an instance of :class:`~RecordSet`. This defines a collection of - s3 data files to train this ``Estimator`` on. + S3 data files to train this ``Estimator`` on. Training data is expected to be encoded as dense or sparse vectors in the "values" feature on each Record. If the data is labeled, the label is expected to be encoded as a list of @@ -110,15 +137,19 @@ def fit(self, records, mini_batch_size=None, **kwargs): Args: records (:class:`~RecordSet`): The records to train this ``Estimator`` on - mini_batch_size (int or None): The size of each mini-batch to use when training. If None, a + mini_batch_size (int or None): The size of each mini-batch to use when training. If ``None``, a default value will be used. + wait (bool): Whether the call should wait until the job completes (default: True). + logs (bool): Whether to show the logs produced by the job. + Only meaningful when wait is True (default: True). + job_name (str): Training job name. If not specified, the estimator generates a default job name, + based on the training image name and current timestamp. """ - self.feature_dim = records.feature_dim - self.mini_batch_size = mini_batch_size + self._prepare_for_training(records, job_name=job_name, mini_batch_size=mini_batch_size) - data = {records.channel: s3_input(records.s3_data, distribution='ShardedByS3Key', - s3_data_type=records.s3_data_type)} - super(AmazonAlgorithmEstimatorBase, self).fit(data, **kwargs) + self.latest_training_job = _TrainingJob.start_new(self, records) + if wait: + self.latest_training_job.wait(logs=logs) def record_set(self, train, labels=None, channel="train"): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. @@ -180,6 +211,14 @@ def __repr__(self): """Return an unambiguous representation of this RecordSet""" return str((RecordSet, self.__dict__)) + def data_channel(self): + """Return a dictionary to represent the training data in a channel for use with ``fit()``""" + return {self.channel: self.records_s3_input()} + + def records_s3_input(self): + """Return a s3_input to represent the training data""" + return s3_input(self.s3_data, distribution='ShardedByS3Key', s3_data_type=self.s3_data_type) + def _build_shards(num_shards, array): if num_shards < 1: diff --git a/src/sagemaker/amazon/hyperparameter.py b/src/sagemaker/amazon/hyperparameter.py index 31014b7f2b..ad25a84d14 100644 --- a/src/sagemaker/amazon/hyperparameter.py +++ b/src/sagemaker/amazon/hyperparameter.py @@ -46,7 +46,6 @@ def validate(self, value): raise ValueError(error_message) def __get__(self, obj, objtype): - """Return the value of this hyperparameter""" if '_hyperparameters' not in dir(obj) or self.name not in obj._hyperparameters: raise AttributeError() return obj._hyperparameters[self.name] diff --git a/src/sagemaker/amazon/kmeans.py b/src/sagemaker/amazon/kmeans.py index 9c9b234254..6b2820312b 100644 --- a/src/sagemaker/amazon/kmeans.py +++ b/src/sagemaker/amazon/kmeans.py @@ -108,8 +108,8 @@ def create_model(self): s3 model data produced by this Estimator.""" return KMeansModel(self.model_data, self.role, self.sagemaker_session) - def fit(self, records, mini_batch_size=5000, **kwargs): - super(KMeans, self).fit(records, mini_batch_size, **kwargs) + def _prepare_for_training(self, records, mini_batch_size=5000, job_name=None): + super(KMeans, self)._prepare_for_training(records, mini_batch_size=mini_batch_size, job_name=job_name) def hyperparameters(self): """Return the SageMaker hyperparameters for training this KMeans Estimator""" diff --git a/src/sagemaker/amazon/lda.py b/src/sagemaker/amazon/lda.py index 934c0b3a48..9cc6dd4fa6 100644 --- a/src/sagemaker/amazon/lda.py +++ b/src/sagemaker/amazon/lda.py @@ -93,11 +93,12 @@ def create_model(self): return LDAModel(self.model_data, self.role, sagemaker_session=self.sagemaker_session) - def fit(self, records, mini_batch_size, **kwargs): + def _prepare_for_training(self, records, mini_batch_size, job_name=None): # mini_batch_size is required, prevent explicit calls with None if mini_batch_size is None: raise ValueError("mini_batch_size must be set") - super(LDA, self).fit(records, mini_batch_size, **kwargs) + + super(LDA, self)._prepare_for_training(records, mini_batch_size=mini_batch_size, job_name=job_name) class LDAPredictor(RealTimePredictor): diff --git a/src/sagemaker/amazon/linear_learner.py b/src/sagemaker/amazon/linear_learner.py index 21063b6dc0..ca2d8fb7fa 100644 --- a/src/sagemaker/amazon/linear_learner.py +++ b/src/sagemaker/amazon/linear_learner.py @@ -228,12 +228,23 @@ def create_model(self): return LinearLearnerModel(self.model_data, self.role, self.sagemaker_session) - def fit(self, records, mini_batch_size=None, **kwargs): + def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): + num_records = None + if isinstance(records, list): + for record in records: + if record.channel == 'train': + num_records = record.num_records + break + if num_records is None: + raise ValueError('Must provide train channel.') + else: + num_records = records.num_records + # mini_batch_size can't be greater than number of records or training job fails default_mini_batch_size = min(self.DEFAULT_MINI_BATCH_SIZE, - max(1, int(records.num_records / self.train_instance_count))) - use_mini_batch_size = mini_batch_size or default_mini_batch_size - super(LinearLearner, self).fit(records, use_mini_batch_size, **kwargs) + max(1, int(num_records / self.train_instance_count))) + mini_batch_size = mini_batch_size or default_mini_batch_size + super(LinearLearner, self)._prepare_for_training(records, mini_batch_size=mini_batch_size, job_name=job_name) class LinearLearnerPredictor(RealTimePredictor): diff --git a/src/sagemaker/amazon/ntm.py b/src/sagemaker/amazon/ntm.py index 3fa8b9fecb..5ec1566fde 100644 --- a/src/sagemaker/amazon/ntm.py +++ b/src/sagemaker/amazon/ntm.py @@ -113,10 +113,10 @@ def create_model(self): return NTMModel(self.model_data, self.role, sagemaker_session=self.sagemaker_session) - def fit(self, records, mini_batch_size=None, **kwargs): + def _prepare_for_training(self, records, mini_batch_size, job_name=None): if mini_batch_size is not None and (mini_batch_size < 1 or mini_batch_size > 10000): raise ValueError("mini_batch_size must be in [1, 10000]") - super(NTM, self).fit(records, mini_batch_size, **kwargs) + super(NTM, self)._prepare_for_training(records, mini_batch_size=mini_batch_size, job_name=job_name) class NTMPredictor(RealTimePredictor): diff --git a/src/sagemaker/amazon/pca.py b/src/sagemaker/amazon/pca.py index 1afe50752e..d62f18c843 100644 --- a/src/sagemaker/amazon/pca.py +++ b/src/sagemaker/amazon/pca.py @@ -92,12 +92,33 @@ def create_model(self): return PCAModel(self.model_data, self.role, sagemaker_session=self.sagemaker_session) - def fit(self, records, mini_batch_size=None, **kwargs): + def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): + """Set hyperparameters needed for training. + + Args: + * records (:class:`~RecordSet`): The records to train this ``Estimator`` on. + * mini_batch_size (int or None): The size of each mini-batch to use when training. If ``None``, a + default value will be used. + * job_name (str): Name of the training job to be created. If not specified, one is generated, + using the base name given to the constructor if applicable. + """ + num_records = None + if isinstance(records, list): + for record in records: + if record.channel == 'train': + num_records = record.num_records + break + if num_records is None: + raise ValueError('Must provide train channel.') + else: + num_records = records.num_records + # mini_batch_size is a required parameter default_mini_batch_size = min(self.DEFAULT_MINI_BATCH_SIZE, - max(1, int(records.num_records / self.train_instance_count))) + max(1, int(num_records / self.train_instance_count))) use_mini_batch_size = mini_batch_size or default_mini_batch_size - super(PCA, self).fit(records, use_mini_batch_size, **kwargs) + + super(PCA, self)._prepare_for_training(records=records, mini_batch_size=use_mini_batch_size, job_name=job_name) class PCAPredictor(RealTimePredictor): diff --git a/src/sagemaker/amazon/randomcutforest.py b/src/sagemaker/amazon/randomcutforest.py index 64730756ce..01b27f475a 100644 --- a/src/sagemaker/amazon/randomcutforest.py +++ b/src/sagemaker/amazon/randomcutforest.py @@ -87,13 +87,13 @@ def create_model(self): return RandomCutForestModel(self.model_data, self.role, sagemaker_session=self.sagemaker_session) - def fit(self, records, mini_batch_size=None, **kwargs): + def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): if mini_batch_size is None: - mini_batch_size = RandomCutForest.MINI_BATCH_SIZE - elif mini_batch_size != RandomCutForest.MINI_BATCH_SIZE: - raise ValueError("Random Cut Forest uses a fixed mini_batch_size of {}" - .format(RandomCutForest.MINI_BATCH_SIZE)) - super(RandomCutForest, self).fit(records, mini_batch_size, **kwargs) + mini_batch_size = self.MINI_BATCH_SIZE + elif mini_batch_size != self.MINI_BATCH_SIZE: + raise ValueError("Random Cut Forest uses a fixed mini_batch_size of {}".format(self.MINI_BATCH_SIZE)) + + super(RandomCutForest, self)._prepare_for_training(records, mini_batch_size=mini_batch_size, job_name=job_name) class RandomCutForestPredictor(RealTimePredictor): diff --git a/src/sagemaker/analytics.py b/src/sagemaker/analytics.py new file mode 100644 index 0000000000..00245b792c --- /dev/null +++ b/src/sagemaker/analytics.py @@ -0,0 +1,294 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import print_function, absolute_import + +from abc import ABCMeta, abstractmethod +from collections import defaultdict +import datetime +import logging + +from six import with_metaclass + +from sagemaker.session import Session +from sagemaker.utils import DeferredError + +try: + import pandas as pd +except ImportError as e: + logging.warning("pandas failed to import. Analytics features will be impaired or broken.") + # Any subsequent attempt to use pandas will raise the ImportError + pd = DeferredError(e) + + +class AnalyticsMetricsBase(with_metaclass(ABCMeta, object)): + """Base class for tuning job or training job analytics classes. + Understands common functionality like persistence and caching. + """ + + def export_csv(self, filename): + """Persists the analytics dataframe to a file. + + Args: + filename (str): The name of the file to save to. + """ + self.dataframe().to_csv(filename) + + def dataframe(self, force_refresh=False): + """A pandas dataframe with lots of interesting results about this object. + Created by calling SageMaker List and Describe APIs and converting them into + a convenient tabular summary. + + Args: + force_refresh (bool): Set to True to fetch the latest data from SageMaker API. + """ + if force_refresh: + self.clear_cache() + if self._dataframe is None: + self._dataframe = self._fetch_dataframe() + return self._dataframe + + @abstractmethod + def _fetch_dataframe(self): + """Sub-class must calculate the dataframe and return it. + """ + pass + + def clear_cache(self): + """Clears the object of all local caches of API methods, so + that the next time any properties are accessed they will be refreshed from + the service. + """ + self._dataframe = None + + +class HyperparameterTuningJobAnalytics(AnalyticsMetricsBase): + """Fetches results about this tuning job and makes them accessible for analytics. + """ + + def __init__(self, hyperparameter_tuning_job_name, sagemaker_session=None): + """Initialize an ``HyperparameterTuningJobAnalytics`` instance. + + Args: + hyperparameter_tuning_job_name (str): name of the HyperparameterTuningJob to + analyze. + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with + Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one + using the default AWS configuration chain. + """ + sagemaker_session = sagemaker_session or Session() + self._sage_client = sagemaker_session.sagemaker_client + self._tuning_job_name = hyperparameter_tuning_job_name + self.clear_cache() + + @property + def name(self): + """Name of the HyperparameterTuningJob being analyzed + """ + return self._tuning_job_name + + def __repr__(self): + return "" % self.name + + def clear_cache(self): + """Clears the object of all local caches of API methods. + """ + super(HyperparameterTuningJobAnalytics, self).clear_cache() + self._tuning_job_describe_result = None + self._training_job_summaries = None + + def _fetch_dataframe(self): + """Returns a pandas dataframe with all the training jobs, their + hyperparameters, results, and metadata about the training jobs. + Includes a column to indicate that any job was the best seen so far. + """ + def reshape(training_summary): + # Helper method to reshape a single training job summary into a dataframe record + out = {} + for k, v in training_summary['TunedHyperParameters'].items(): + # Something (bokeh?) gets confused with ints so convert to float + try: + v = float(v) + except (TypeError, ValueError): + pass + out[k] = v + out['TrainingJobName'] = training_summary['TrainingJobName'] + out['TrainingJobStatus'] = training_summary['TrainingJobStatus'] + out['FinalObjectiveValue'] = training_summary.get('FinalHyperParameterTuningJobObjectiveMetric', + {}).get('Value') + + start_time = training_summary.get('TrainingStartTime', None) + end_time = training_summary.get('TrainingEndTime', None) + out['TrainingStartTime'] = start_time + out['TrainingEndTime'] = end_time + if start_time and end_time: + out['TrainingElapsedTimeSeconds'] = (end_time - start_time).total_seconds() + return out + # Run that helper over all the summaries. + df = pd.DataFrame([reshape(tjs) for tjs in self.training_job_summaries()]) + return df + + @property + def tuning_ranges(self): + """A dict describing the ranges of all tuned hyperparameters. + Dict's key is the name of the hyper param. Dict's value is the range. + """ + out = {} + for _, ranges in self.description()['HyperParameterTuningJobConfig']['ParameterRanges'].items(): + for param in ranges: + out[param['Name']] = param + return out + + def description(self, force_refresh=False): + """Response to DescribeHyperParameterTuningJob + + Args: + force_refresh (bool): Set to True to fetch the latest data from SageMaker API. + """ + if force_refresh: + self.clear_cache() + if not self._tuning_job_describe_result: + self._tuning_job_describe_result = self._sage_client.describe_hyper_parameter_tuning_job( + HyperParameterTuningJobName=self.name + ) + return self._tuning_job_describe_result + + def training_job_summaries(self, force_refresh=False): + """A list of everything (paginated) from ListTrainingJobsForTuningJob + + Args: + force_refresh (bool): Set to True to fetch the latest data from SageMaker API. + """ + if force_refresh: + self.clear_cache() + if self._training_job_summaries is not None: + return self._training_job_summaries + output = [] + next_args = {} + for count in range(100): + logging.debug("Calling list_training_jobs_for_hyper_parameter_tuning_job %d" % count) + raw_result = self._sage_client.list_training_jobs_for_hyper_parameter_tuning_job( + HyperParameterTuningJobName=self.name, MaxResults=100, **next_args + ) + new_output = raw_result['TrainingJobSummaries'] + output.extend(new_output) + logging.debug("Got %d more TrainingJobs. Total so far: %d" % (len(new_output), len(output))) + if ('NextToken' in raw_result) and (len(new_output) > 0): + next_args['NextToken'] = raw_result['NextToken'] + else: + break + self._training_job_summaries = output + return output + + +class TrainingJobAnalytics(AnalyticsMetricsBase): + """Fetches training curve data from CloudWatch Metrics for a specific training job. + """ + + CLOUDWATCH_NAMESPACE = '/aws/sagemaker/HyperParameterTuningJobs' + + def __init__(self, training_job_name, metric_names, sagemaker_session=None): + """Initialize an ``TrainingJobAnalytics`` instance. + + Args: + training_job_name (str): name of the TrainingJob to analyze. + metric_names (list): string names of all the metrics to collect for this training job + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with + Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one + using the default AWS configuration chain. + """ + sagemaker_session = sagemaker_session or Session() + self._sage_client = sagemaker_session.sagemaker_client + self._cloudwatch = sagemaker_session.boto_session.client('cloudwatch') + self._training_job_name = training_job_name + self._metric_names = metric_names + self.clear_cache() + + @property + def name(self): + """Name of the TrainingJob being analyzed + """ + return self._training_job_name + + def __repr__(self): + return "" % self.name + + def clear_cache(self): + """Clears the object of all local caches of API methods, so + that the next time any properties are accessed they will be refreshed from + the service. + """ + super(TrainingJobAnalytics, self).clear_cache() + self._data = defaultdict(list) + self._time_interval = self._determine_timeinterval() + + def _determine_timeinterval(self): + """Returns a dict with two datetime objects, start_time and end_time + covering the interval of the training job + """ + description = self._sage_client.describe_training_job(TrainingJobName=self.name) + start_time = description[u'TrainingStartTime'] # datetime object + end_time = description.get(u'TrainingEndTime', datetime.datetime.utcnow()) + return { + 'start_time': start_time, + 'end_time': end_time, + } + + def _fetch_dataframe(self): + for metric_name in self._metric_names: + self._fetch_metric(metric_name) + return pd.DataFrame(self._data) + + def _fetch_metric(self, metric_name): + """Fetches all the values of a named metric, and adds them to _data + """ + request = { + 'Namespace': self.CLOUDWATCH_NAMESPACE, + 'MetricName': metric_name, + 'Dimensions': [ + { + 'Name': 'TrainingJobName', + 'Value': self.name + } + ], + 'StartTime': self._time_interval['start_time'], + 'EndTime': self._time_interval['end_time'], + 'Period': 60, + 'Statistics': ['Average'], + } + raw_cwm_data = self._cloudwatch.get_metric_statistics(**request)['Datapoints'] + if len(raw_cwm_data) == 0: + logging.warning("Warning: No metrics called %s found" % metric_name) + return + + # Process data: normalize to starting time, and sort. + base_time = min(raw_cwm_data, key=lambda pt: pt['Timestamp'])['Timestamp'] + all_xy = [] + for pt in raw_cwm_data: + y = pt['Average'] + x = (pt['Timestamp'] - base_time).total_seconds() + all_xy.append([x, y]) + all_xy = sorted(all_xy, key=lambda x: x[0]) + + # Store everything in _data to make a dataframe from + for elapsed_seconds, value in all_xy: + self._add_single_metric(elapsed_seconds, metric_name, value) + + def _add_single_metric(self, timestamp, metric_name, value): + """Stores a single metric in the _data dict which can be + converted to a dataframe. + """ + # note that this method is built this way to make it possible to + # support live-refreshing charts in Bokeh at some point in the future. + self._data['timestamp'].append(timestamp) + self._data['metric_name'].append(metric_name) + self._data['value'].append(value) diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index ed4581aa06..85ff100b3c 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -17,15 +17,15 @@ import os from abc import ABCMeta from abc import abstractmethod -from six import with_metaclass, string_types +from six import with_metaclass +from sagemaker.analytics import TrainingJobAnalytics from sagemaker.fw_utils import tar_and_upload_dir, parse_s3_url, UploadedCode, validate_source_dir -from sagemaker.local import LocalSession, file_input - +from sagemaker.job import _Job +from sagemaker.local import LocalSession from sagemaker.model import Model from sagemaker.model import (SCRIPT_PARAM_NAME, DIR_PARAM_NAME, CLOUDWATCH_METRICS_PARAM_NAME, CONTAINER_LOG_LEVEL_PARAM_NAME, JOB_NAME_PARAM_NAME, SAGEMAKER_REGION_PARAM_NAME) - from sagemaker.predictor import RealTimePredictor from sagemaker.session import Session from sagemaker.session import s3_input @@ -121,6 +121,29 @@ def hyperparameters(self): """ pass + def _prepare_for_training(self, job_name=None): + """Set any values in the estimator that need to be set before training. + + Args: + * job_name (str): Name of the training job to be created. If not specified, one is generated, + using the base name given to the constructor if applicable. + """ + if job_name is not None: + self._current_job_name = job_name + else: + # honor supplied base_job_name or generate it + base_name = self.base_job_name or base_name_from_image(self.train_image()) + self._current_job_name = name_from_base(base_name) + + # if output_path was specified we use it otherwise initialize here. + # For Local Mode with local_code=True we don't need an explicit output_path + if self.output_path is None: + local_code = get_config_value('local.local_code', self.sagemaker_session.config) + if self.sagemaker_session.local_mode and local_code: + self.output_path = '' + else: + self.output_path = 's3://{}/'.format(self.sagemaker_session.default_bucket()) + def fit(self, inputs, wait=True, logs=True, job_name=None): """Train a model using the input training dataset. @@ -149,22 +172,7 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): job_name (str): Training job name. If not specified, the estimator generates a default job name, based on the training image name and current timestamp. """ - - if job_name is not None: - self._current_job_name = job_name - else: - # make sure the job name is unique for each invocation, honor supplied base_job_name or generate it - base_name = self.base_job_name or base_name_from_image(self.train_image()) - self._current_job_name = name_from_base(base_name) - - # if output_path was specified we use it otherwise initialize here. - # For Local Mode with local_code=True we don't need an explicit output_path - if self.output_path is None: - local_code = get_config_value('local.local_code', self.sagemaker_session.config) - if self.sagemaker_session.local_mode and local_code: - self.output_path = '' - else: - self.output_path = 's3://{}/'.format(self.sagemaker_session.default_bucket()) + self._prepare_for_training(job_name=job_name) self.latest_training_job = _TrainingJob.start_new(self, inputs) if wait: @@ -309,11 +317,18 @@ def delete_endpoint(self): raise ValueError('Endpoint was not created yet') self.sagemaker_session.delete_endpoint(self.latest_training_job.name) + @property + def training_job_analytics(self): + """Returns a TrainingJobAnalytics object for the current training job. + """ + if self._current_job_name is None: + raise ValueError('Estimator is not associated with a TrainingJob') + return TrainingJobAnalytics(self._current_job_name) + -class _TrainingJob(object): +class _TrainingJob(_Job): def __init__(self, sagemaker_session, training_job_name): - self.sagemaker_session = sagemaker_session - self.job_name = training_job_name + super(_TrainingJob, self).__init__(sagemaker_session, training_job_name) @classmethod def start_new(cls, estimator, inputs): @@ -324,7 +339,8 @@ def start_new(cls, estimator, inputs): inputs (str): Parameters used when called :meth:`~sagemaker.estimator.EstimatorBase.fit`. Returns: - sagemaker.estimator.Framework: Constructed object that captures all information about the started job. + sagemaker.estimator._TrainingJob: Constructed object that captures all information about the started + training job. """ local_mode = estimator.sagemaker_session.local_mode @@ -334,87 +350,19 @@ def start_new(cls, estimator, inputs): if not local_mode: raise ValueError('File URIs are supported in local mode only. Please use a S3 URI instead.') - input_config = _TrainingJob._format_inputs_to_input_config(inputs) - role = estimator.sagemaker_session.expand_role(estimator.role) - output_config = _TrainingJob._prepare_output_config(estimator.output_path, estimator.output_kms_key) - resource_config = _TrainingJob._prepare_resource_config(estimator.train_instance_count, - estimator.train_instance_type, - estimator.train_volume_size) - stop_condition = _TrainingJob._prepare_stopping_condition(estimator.train_max_run) + config = _Job._load_config(inputs, estimator) if estimator.hyperparameters() is not None: hyperparameters = {str(k): str(v) for (k, v) in estimator.hyperparameters().items()} estimator.sagemaker_session.train(image=estimator.train_image(), input_mode=estimator.input_mode, - input_config=input_config, role=role, job_name=estimator._current_job_name, - output_config=output_config, resource_config=resource_config, - hyperparameters=hyperparameters, stop_condition=stop_condition, - tags=estimator.tags) + input_config=config['input_config'], role=config['role'], + job_name=estimator._current_job_name, output_config=config['output_config'], + resource_config=config['resource_config'], hyperparameters=hyperparameters, + stop_condition=config['stop_condition'], tags=estimator.tags) return cls(estimator.sagemaker_session, estimator._current_job_name) - @staticmethod - def _format_inputs_to_input_config(inputs): - input_dict = {} - if isinstance(inputs, string_types): - input_dict['training'] = _TrainingJob._format_string_uri_input(inputs) - elif isinstance(inputs, s3_input): - input_dict['training'] = inputs - elif isinstance(input, file_input): - input_dict['training'] = inputs - elif isinstance(inputs, dict): - for k, v in inputs.items(): - input_dict[k] = _TrainingJob._format_string_uri_input(v) - else: - raise ValueError('Cannot format input {}. Expecting one of str, dict or s3_input'.format(inputs)) - - channels = [] - for channel_name, channel_s3_input in input_dict.items(): - channel_config = channel_s3_input.config.copy() - channel_config['ChannelName'] = channel_name - channels.append(channel_config) - return channels - - @staticmethod - def _format_string_uri_input(input): - if isinstance(input, str): - if input.startswith('s3://'): - return s3_input(input) - elif input.startswith('file://'): - return file_input(input) - else: - raise ValueError('Training input data must be a valid S3 or FILE URI: must start with "s3://" or ' - '"file://"') - elif isinstance(input, s3_input): - return input - elif isinstance(input, file_input): - return input - else: - raise ValueError('Cannot format input {}. Expecting one of str, s3_input, or file_input'.format(input)) - - @staticmethod - def _prepare_output_config(s3_path, kms_key_id): - config = {'S3OutputPath': s3_path} - if kms_key_id is not None: - config['KmsKeyId'] = kms_key_id - return config - - @staticmethod - def _prepare_resource_config(instance_count, instance_type, volume_size): - resource_config = {'InstanceCount': instance_count, - 'InstanceType': instance_type, - 'VolumeSizeInGB': volume_size} - return resource_config - - @staticmethod - def _prepare_stopping_condition(max_run): - stop_condition = {'MaxRuntimeInSeconds': max_run} - return stop_condition - - @property - def name(self): - return self.job_name - def wait(self, logs=True): if logs: self.sagemaker_session.logs_for_job(self.job_name, wait=True) @@ -475,8 +423,7 @@ def train_image(self): """ Returns the docker image to use for training. - The fit() method, that does the model training, calls this method to find the image to use - for model training. + The fit() method, that does the model training, calls this method to find the image to use for model training. """ return self.image_name @@ -575,39 +522,14 @@ def __init__(self, entry_point, source_dir=None, hyperparameters=None, enable_cl self._hyperparameters = hyperparameters or {} self.code_location = code_location - def fit(self, inputs, wait=True, logs=True, job_name=None): - """Train a model using the input training dataset. - - The API calls the Amazon SageMaker CreateTrainingJob API to start model training. - The API uses configuration you provided to create the estimator and the - specified input training data to send the CreatingTrainingJob request to Amazon SageMaker. - - This is a synchronous operation. After the model training successfully completes, - you can call the ``deploy()`` method to host the model using the Amazon SageMaker hosting services. + def _prepare_for_training(self, job_name=None): + """Set hyperparameters needed for training. This method will also validate ``source_dir``. Args: - inputs (str or dict or sagemaker.session.s3_input): Information about the training data. - This can be one of three types: - (str) - the S3 location where training data is saved. - (dict[str, str] or dict[str, sagemaker.session.s3_input]) - If using multiple channels for - training data, you can specify a dict mapping channel names - to strings or :func:`~sagemaker.session.s3_input` objects. - (sagemaker.session.s3_input) - channel configuration for S3 data sources that can provide - additional information about the training dataset. See :func:`sagemaker.session.s3_input` - for full details. - wait (bool): Whether the call shouldl wait until the job completes (default: True). - logs (bool): Whether to show the logs produced by the job. - Only meaningful when wait is True (default: True). - job_name (str): Training job name. If not specified, the estimator generates a default job name, - based on the training image name and current timestamp. + * job_name (str): Name of the training job to be created. If not specified, one is generated, + using the base name given to the constructor if applicable. """ - # always determine new job name _here_ because it is used before base is called - if job_name is not None: - self._current_job_name = job_name - else: - # honor supplied base_job_name or generate it - base_name = self.base_job_name or base_name_from_image(self.train_image()) - self._current_job_name = name_from_base(base_name) + super(Framework, self)._prepare_for_training(job_name=job_name) # validate source dir will raise a ValueError if there is something wrong with the # source directory. We are intentionally not handling it because this is a critical error. @@ -637,7 +559,6 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): self._hyperparameters[CONTAINER_LOG_LEVEL_PARAM_NAME] = self.container_log_level self._hyperparameters[JOB_NAME_PARAM_NAME] = self._current_job_name self._hyperparameters[SAGEMAKER_REGION_PARAM_NAME] = self.sagemaker_session.boto_region_name - super(Framework, self).fit(inputs, wait, logs, self._current_job_name) def _stage_user_code_in_s3(self): """ Upload the user training script to s3 and return the location. @@ -689,7 +610,17 @@ def _prepare_init_params_from_job_description(cls, job_details): init_params['container_log_level'] = json.loads( init_params['hyperparameters'].get(CONTAINER_LOG_LEVEL_PARAM_NAME)) - init_params['hyperparameters'] = {k: json.loads(v) for k, v in init_params['hyperparameters'].items()} + hyperparameters = {} + for k, v in init_params['hyperparameters'].items(): + # Tuning jobs add this special hyperparameter which is not JSON serialized + if k == '_tuning_objective_metric': + if v.startswith('"') and v.endswith('"'): + v = v.strip('"') + hyperparameters[k] = v + else: + hyperparameters[k] = json.loads(v) + + init_params['hyperparameters'] = hyperparameters return init_params diff --git a/src/sagemaker/job.py b/src/sagemaker/job.py new file mode 100644 index 0000000000..0787646aa6 --- /dev/null +++ b/src/sagemaker/job.py @@ -0,0 +1,155 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +from abc import abstractmethod +from six import string_types + +from sagemaker.local import file_input +from sagemaker.session import s3_input + + +class _Job(object): + """Handle creating, starting and waiting for Amazon SageMaker jobs to finish. + + This class shouldn't be directly instantiated. + + Subclasses must define a way to create, start and wait for an Amazon SageMaker job. + """ + + def __init__(self, sagemaker_session, job_name): + self.sagemaker_session = sagemaker_session + self.job_name = job_name + + @abstractmethod + def start_new(cls, estimator, inputs): + """Create a new Amazon SageMaker job from the estimator. + + Args: + estimator (sagemaker.estimator.EstimatorBase): Estimator object created by the user. + inputs (str): Parameters used when called :meth:`~sagemaker.estimator.EstimatorBase.fit`. + + Returns: + sagemaker.job: Constructed object that captures all information about the started job. + """ + pass + + @abstractmethod + def wait(self): + """Wait for the Amazon SageMaker job to finish. + """ + pass + + @staticmethod + def _load_config(inputs, estimator): + input_config = _Job._format_inputs_to_input_config(inputs) + role = estimator.sagemaker_session.expand_role(estimator.role) + output_config = _Job._prepare_output_config(estimator.output_path, estimator.output_kms_key) + resource_config = _Job._prepare_resource_config(estimator.train_instance_count, + estimator.train_instance_type, + estimator.train_volume_size) + stop_condition = _Job._prepare_stop_condition(estimator.train_max_run) + + return {'input_config': input_config, + 'role': role, + 'output_config': output_config, + 'resource_config': resource_config, + 'stop_condition': stop_condition} + + @staticmethod + def _format_inputs_to_input_config(inputs): + # Deferred import due to circular dependency + from sagemaker.amazon.amazon_estimator import RecordSet + if isinstance(inputs, RecordSet): + inputs = inputs.data_channel() + + input_dict = {} + if isinstance(inputs, string_types): + input_dict['training'] = _Job._format_string_uri_input(inputs) + elif isinstance(inputs, s3_input): + input_dict['training'] = inputs + elif isinstance(input, file_input): + input_dict['training'] = inputs + elif isinstance(inputs, dict): + for k, v in inputs.items(): + input_dict[k] = _Job._format_string_uri_input(v) + elif isinstance(inputs, list): + input_dict = _Job._format_record_set_list_input(inputs) + else: + raise ValueError( + 'Cannot format input {}. Expecting one of str, dict or s3_input'.format(inputs)) + + channels = [] + for channel_name, channel_s3_input in input_dict.items(): + channel_config = channel_s3_input.config.copy() + channel_config['ChannelName'] = channel_name + channels.append(channel_config) + return channels + + @staticmethod + def _format_string_uri_input(input): + if isinstance(input, str): + if input.startswith('s3://'): + return s3_input(input) + elif input.startswith('file://'): + return file_input(input) + else: + raise ValueError( + 'Training input data must be a valid S3 or FILE URI: must start with "s3://" or ' + '"file://"') + elif isinstance(input, s3_input): + return input + elif isinstance(input, file_input): + return input + else: + raise ValueError( + 'Cannot format input {}. Expecting one of str, s3_input, or file_input'.format( + input)) + + @staticmethod + def _format_record_set_list_input(inputs): + # Deferred import due to circular dependency + from sagemaker.amazon.amazon_estimator import RecordSet + + input_dict = {} + for record in inputs: + if not isinstance(record, RecordSet): + raise ValueError('List compatible only with RecordSets.') + + if record.channel in input_dict: + raise ValueError('Duplicate channels not allowed.') + + input_dict[record.channel] = record.records_s3_input() + + return input_dict + + @staticmethod + def _prepare_output_config(s3_path, kms_key_id): + config = {'S3OutputPath': s3_path} + if kms_key_id is not None: + config['KmsKeyId'] = kms_key_id + return config + + @staticmethod + def _prepare_resource_config(instance_count, instance_type, volume_size): + return {'InstanceCount': instance_count, + 'InstanceType': instance_type, + 'VolumeSizeInGB': volume_size} + + @staticmethod + def _prepare_stop_condition(max_run): + return {'MaxRuntimeInSeconds': max_run} + + @property + def name(self): + return self.job_name diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 752936bdc6..c4e132e5cf 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -12,18 +12,17 @@ # language governing permissions and limitations under the License. from __future__ import print_function, absolute_import +import json import logging -import re - import os +import re import sys import time import boto3 -import json +import botocore.config import six import yaml -import botocore.config from botocore.exceptions import ClientError from sagemaker.user_agent import prepend_user_agent @@ -262,6 +261,101 @@ def train(self, image, input_mode, input_config, role, job_name, output_config, LOGGER.debug('train request: {}'.format(json.dumps(train_request, indent=4))) self.sagemaker_client.create_training_job(**train_request) + def tune(self, job_name, strategy, objective_type, objective_metric_name, + max_jobs, max_parallel_jobs, parameter_ranges, + static_hyperparameters, image, input_mode, metric_definitions, + role, input_config, output_config, resource_config, stop_condition, tags): + """Create an Amazon SageMaker hyperparameter tuning job + + Args: + job_name (str): Name of the tuning job being created. + strategy (str): Strategy to be used. + objective_type (str): Minimize/Maximize + objective_metric_name (str): Name of the metric to use when evaluating training job. + max_jobs (int): Maximum total number of jobs to start. + max_parallel_jobs (int): Maximum number of parallel jobs to start. + parameter_ranges (dict): Parameter ranges in a dictionary of types: Continuous, Integer, Categorical + static_hyperparameters (dict): Hyperparameters for model training. The hyperparameters are made accessible + as a dict[str, str] to the training code on SageMaker. For convenience, this accepts other types for + keys and values, but ``str()`` will be called to convert them before training. + image (str): Docker image containing training code. + input_mode (str): The input mode that the algorithm supports. Valid modes: + + * 'File' - Amazon SageMaker copies the training dataset from the S3 location to + a directory in the Docker container. + * 'Pipe' - Amazon SageMaker streams data directly from S3 to the container via a Unix-named pipe. + metric_definitions (list[dict]): Metrics definition with 'name' and 'regex' keys. + role (str): An AWS IAM role (either name or full ARN). The Amazon SageMaker training jobs and APIs + that create Amazon SageMaker endpoints use this role to access training data and model artifacts. + You must grant sufficient permissions to this role. + input_config (list): A list of Channel objects. Each channel is a named input source. Please refer to + the format details described: + https://botocore.readthedocs.io/en/latest/reference/services/sagemaker.html#SageMaker.Client.create_training_job + output_config (dict): The S3 URI where you want to store the training results and optional KMS key ID. + resource_config (dict): Contains values for ResourceConfig: + instance_count (int): Number of EC2 instances to use for training. + instance_type (str): Type of EC2 instance to use for training, for example, 'ml.c4.xlarge'. + stop_condition (dict): Defines when training shall finish. Contains entries that can be understood by the + service like ``MaxRuntimeInSeconds``. + tags (list[dict]): List of tags for labeling the tuning job. + """ + tune_request = { + 'HyperParameterTuningJobName': job_name, + 'HyperParameterTuningJobConfig': { + 'Strategy': strategy, + 'HyperParameterTuningJobObjective': { + 'Type': objective_type, + 'MetricName': objective_metric_name, + }, + 'ResourceLimits': { + 'MaxNumberOfTrainingJobs': max_jobs, + 'MaxParallelTrainingJobs': max_parallel_jobs, + }, + 'ParameterRanges': parameter_ranges, + }, + 'TrainingJobDefinition': { + 'StaticHyperParameters': static_hyperparameters, + 'AlgorithmSpecification': { + 'TrainingImage': image, + 'TrainingInputMode': input_mode, + }, + 'RoleArn': role, + 'InputDataConfig': input_config, + 'OutputDataConfig': output_config, + 'ResourceConfig': resource_config, + 'StoppingCondition': stop_condition, + } + } + + if metric_definitions is not None: + tune_request['TrainingJobDefinition']['AlgorithmSpecification']['MetricDefinitions'] = metric_definitions + + if tags is not None: + tune_request['Tags'] = tags + + LOGGER.info('Creating hyperparameter tuning job with name: {}'.format(job_name)) + LOGGER.debug('tune request: {}'.format(json.dumps(tune_request, indent=4))) + self.sagemaker_client.create_hyper_parameter_tuning_job(**tune_request) + + def stop_tuning_job(self, name): + """Attempts to stop tuning job on Amazon SageMaker with specified name. + + Args: + name: Name of Amazon SageMaker tuning job. + """ + try: + LOGGER.info('Stopping tuning job: {}'.format(name)) + self.sagemaker_client.stop_hyper_parameter_tuning_job(HyperParameterTuningJobName=name) + except ClientError as e: + error_code = e.response['Error']['Code'] + # allow to pass if the job already stopped + if error_code == 'ValidationException': + LOGGER.info('Tuning job: {} is already stopped or not running.'.format(name)) + pass + else: + LOGGER.error('Error occurred while attempting to stop tuning job: {}. Please try again.'.format(name)) + raise + def create_model(self, name, role, primary_container): """Create an Amazon SageMaker ``Model``. @@ -393,21 +487,39 @@ def wait_for_job(self, job, poll=5): ValueError: If the training job fails. """ desc = _wait_until(lambda: _train_done(self.sagemaker_client, job), poll) - self._check_job_status(job, desc) + self._check_job_status(job, desc, 'TrainingJobStatus') return desc - def _check_job_status(self, job, desc): + def wait_for_tuning_job(self, job, poll=5): + """Wait for an Amazon SageMaker tuning job to complete. + + Args: + job (str): Name of the tuning job to wait for. + poll (int): Polling interval in seconds (default: 5). + + Returns: + (dict): Return value from the ``DescribeHyperParameterTuningJob`` API. + + Raises: + ValueError: If the hyperparameter tuning job fails. + """ + desc = _wait_until(lambda: _tuning_job_status(self.sagemaker_client, job), poll) + self._check_job_status(job, desc, 'HyperParameterTuningJobStatus') + return desc + + def _check_job_status(self, job, desc, status_key_name): """Check to see if the job completed successfully and, if not, construct and raise a ValueError. Args: job (str): The name of the job to check. desc (dict[str, str]): The result of ``describe_training_job()``. + status_key_name (str): Status key name to check for. Raises: ValueError: If the training job fails. """ - status = desc['TrainingJobStatus'] + status = desc[status_key_name] if status != 'Completed' and status != 'Stopped': reason = desc.get('FailureReason', '(No reason provided)') @@ -671,7 +783,7 @@ def logs_for_job(self, job_name, wait=False, poll=10): # noqa: C901 - suppress state = LogState.JOB_COMPLETE if wait: - self._check_job_status(job_name, description) + self._check_job_status(job_name, description, 'TrainingJobStatus') if dot: print() print('===== Job Complete =====') @@ -823,6 +935,29 @@ def _train_done(sagemaker_client, job_name): return desc +def _tuning_job_status(sagemaker_client, job_name): + tuning_status_codes = { + 'Completed': '!', + 'InProgress': '.', + 'Failed': '*', + 'Stopped': 's', + 'Stopping': '_' + } + in_progress_statuses = ['InProgress', 'Stopping'] + + desc = sagemaker_client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job_name) + status = desc['HyperParameterTuningJobStatus'] + + print(tuning_status_codes.get(status, '?'), end='') + sys.stdout.flush() + + if status in in_progress_statuses: + return None + + print('') + return desc + + def _deploy_done(sagemaker_client, endpoint_name): hosting_status_codes = { "OutOfService": "x", diff --git a/src/sagemaker/tuner.py b/src/sagemaker/tuner.py new file mode 100644 index 0000000000..39e70827f5 --- /dev/null +++ b/src/sagemaker/tuner.py @@ -0,0 +1,399 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import importlib +import inspect +import json + +from sagemaker.amazon.amazon_estimator import AmazonAlgorithmEstimatorBase, RecordSet +from sagemaker.amazon.hyperparameter import Hyperparameter as hp # noqa +from sagemaker.analytics import HyperparameterTuningJobAnalytics +from sagemaker.estimator import Framework +from sagemaker.job import _Job +from sagemaker.session import Session +from sagemaker.utils import base_name_from_image, name_from_base, to_str + +AMAZON_ESTIMATOR_MODULE = 'sagemaker' +AMAZON_ESTIMATOR_CLS_NAMES = { + 'factorization-machines': 'FactorizationMachines', + 'kmeans': 'KMeans', + 'lda': 'LDA', + 'linear-learner': 'LinearLearner', + 'ntm': 'NTM', + 'pca': 'PCA', + 'randomcutforest': 'RandomCutForest', +} + + +class _ParameterRange(object): + __all_types__ = ['Continuous', 'Categorical', 'Integer'] + + def __init__(self, min_value, max_value): + self.min_value = min_value + self.max_value = max_value + + def as_tuning_range(self, name): + return {'Name': name, + 'MinValue': to_str(self.min_value), + 'MaxValue': to_str(self.max_value)} + + +class ContinuousParameter(_ParameterRange): + __name__ = 'Continuous' + + def __init__(self, min_value, max_value): + super(ContinuousParameter, self).__init__(min_value, max_value) + + +class CategoricalParameter(_ParameterRange): + __name__ = 'Categorical' + + def __init__(self, values): + if isinstance(values, list): + self.values = [to_str(v) for v in values] + else: + self.values = [to_str(values)] + + def as_tuning_range(self, name): + return {'Name': name, + 'Values': self.values} + + def as_json_range(self, name): + return {'Name': name, 'Values': [json.dumps(v) for v in self.values]} + + +class IntegerParameter(_ParameterRange): + __name__ = 'Integer' + + def __init__(self, min_value, max_value): + super(IntegerParameter, self).__init__(min_value, max_value) + + +class HyperparameterTuner(object): + TUNING_JOB_NAME_MAX_LENGTH = 32 + + SAGEMAKER_ESTIMATOR_MODULE = 'sagemaker_estimator_module' + SAGEMAKER_ESTIMATOR_CLASS_NAME = 'sagemaker_estimator_class_name' + + DEFAULT_ESTIMATOR_MODULE = 'sagemaker.estimator' + DEFAULT_ESTIMATOR_CLS_NAME = 'Estimator' + + def __init__(self, estimator, objective_metric_name, hyperparameter_ranges, metric_definitions=None, + strategy='Bayesian', objective_type='Maximize', max_jobs=1, max_parallel_jobs=1, + tags=None, base_tuning_job_name=None): + self._hyperparameter_ranges = hyperparameter_ranges + if self._hyperparameter_ranges is None or len(self._hyperparameter_ranges) == 0: + raise ValueError('Need to specify hyperparameter ranges') + + self.estimator = estimator + self.objective_metric_name = objective_metric_name + self.metric_definitions = metric_definitions + self._validate_parameter_ranges() + + self.strategy = strategy + self.objective_type = objective_type + self.max_jobs = max_jobs + self.max_parallel_jobs = max_parallel_jobs + + self.tags = tags + self.base_tuning_job_name = base_tuning_job_name + self._current_job_name = None + self.latest_tuning_job = None + + def _prepare_for_training(self, job_name=None): + if job_name is not None: + self._current_job_name = job_name + else: + base_name = self.base_tuning_job_name or base_name_from_image(self.estimator.train_image()) + self._current_job_name = name_from_base(base_name, max_length=self.TUNING_JOB_NAME_MAX_LENGTH, short=True) + + self.static_hyperparameters = {to_str(k): to_str(v) for (k, v) in self.estimator.hyperparameters().items()} + for hyperparameter_name in self._hyperparameter_ranges.keys(): + self.static_hyperparameters.pop(hyperparameter_name, None) + + # For attach() to know what estimator to use for non-1P algorithms + # (1P algorithms don't accept extra hyperparameters) + if not isinstance(self.estimator, AmazonAlgorithmEstimatorBase): + self.static_hyperparameters[self.SAGEMAKER_ESTIMATOR_CLASS_NAME] = json.dumps( + self.estimator.__class__.__name__) + self.static_hyperparameters[self.SAGEMAKER_ESTIMATOR_MODULE] = json.dumps(self.estimator.__module__) + + def fit(self, inputs, job_name=None, **kwargs): + """Start a hyperparameter tuning job. + + Args: + inputs (str): Parameters used when called :meth:`~sagemaker.estimator.EstimatorBase.fit`. + job_name (str): Tuning job name. If not specified, the tuner generates a default job name, + based on the training image name and current timestamp. + **kwargs: Other arguments + """ + if isinstance(inputs, list) or isinstance(inputs, RecordSet): + self.estimator._prepare_for_training(inputs, **kwargs) + else: + self.estimator._prepare_for_training(job_name) + + self._prepare_for_training(job_name=job_name) + self.latest_tuning_job = _TuningJob.start_new(self, inputs) + + @classmethod + def attach(cls, tuning_job_name, sagemaker_session=None, job_details=None, estimator_cls=None): + sagemaker_session = sagemaker_session or Session() + + if job_details is None: + job_details = sagemaker_session.sagemaker_client \ + .describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name) + + estimator_cls = cls._prepare_estimator_cls(estimator_cls, job_details['TrainingJobDefinition']) + estimator = cls._prepare_estimator_from_job_description(estimator_cls, job_details['TrainingJobDefinition'], + sagemaker_session) + init_params = cls._prepare_init_params_from_job_description(job_details) + + tuner = cls(estimator=estimator, **init_params) + tuner.latest_tuning_job = _TuningJob(sagemaker_session=sagemaker_session, tuning_job_name=tuning_job_name) + + return tuner + + def deploy(self, initial_instance_count, instance_type, endpoint_name=None, **kwargs): + """Deploy the best trained or user specified model to an Amazon SageMaker endpoint and return a + ``sagemaker.RealTimePredictor`` + object. + + More information: + http://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html + + Args: + initial_instance_count (int): Minimum number of EC2 instances to deploy to an endpoint for + prediction. + instance_type (str): Type of EC2 instance to deploy to an endpoint for prediction, + for example, 'ml.c4.xlarge'. + endpoint_name (str): Name to use for creating an Amazon SageMaker endpoint. If not specified, + the name of the training job is used. + **kwargs: Passed to invocation of ``create_model()``. Implementations may customize + ``create_model()`` to accept ``**kwargs`` to customize model creation during deploy. + For more, see the implementation docs. + + Returns: + sagemaker.predictor.RealTimePredictor: A predictor that provides a ``predict()`` method, + which can be used to send requests to the Amazon SageMaker endpoint and obtain inferences. + """ + endpoint_name = endpoint_name or self.best_training_job() + best_estimator = self.estimator.attach(self.best_training_job(), + sagemaker_session=self.estimator.sagemaker_session) + return best_estimator.deploy(initial_instance_count, instance_type, endpoint_name=endpoint_name, **kwargs) + + def stop_tuning_job(self): + """Stop latest running tuning job. + """ + self._ensure_last_tuning_job() + self.latest_tuning_job.stop() + + def wait(self): + """Wait for latest tuning job to finish. + """ + self._ensure_last_tuning_job() + self.latest_tuning_job.wait() + + def best_training_job(self): + """Return name of the best training job for the latest tuning job. + """ + self._ensure_last_tuning_job() + + tuning_job_describe_result = \ + self.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job( + HyperParameterTuningJobName=self.latest_tuning_job.name) + + try: + return tuning_job_describe_result['BestTrainingJob']['TrainingJobName'] + except KeyError: + raise Exception('Best training job not available for tuning job: {}'.format(self.latest_tuning_job.name)) + + def delete_endpoint(self, endpoint_name=None): + """Delete an Amazon SageMaker endpoint. + + If an endpoint name is not specified, this defaults to looking for an endpoint that + shares a name with the best training job for deletion. + + Args: + endpoint_name (str): Name of the endpoint to delete + """ + endpoint_name = endpoint_name or self.best_training_job() + self.sagemaker_session.delete_endpoint(endpoint_name) + + def _ensure_last_tuning_job(self): + if self.latest_tuning_job is None: + raise ValueError('No tuning job available') + + @classmethod + def _prepare_estimator_cls(cls, estimator_cls, training_details): + # Check for customer-specified estimator first + if estimator_cls is not None: + module, cls_name = estimator_cls.rsplit('.', 1) + return getattr(importlib.import_module(module), cls_name) + + # Then check for estimator class in hyperparameters + hyperparameters = training_details['StaticHyperParameters'] + if cls.SAGEMAKER_ESTIMATOR_CLASS_NAME in hyperparameters and cls.SAGEMAKER_ESTIMATOR_MODULE in hyperparameters: + module = hyperparameters.get(cls.SAGEMAKER_ESTIMATOR_MODULE) + cls_name = hyperparameters.get(cls.SAGEMAKER_ESTIMATOR_CLASS_NAME) + return getattr(importlib.import_module(json.loads(module)), json.loads(cls_name)) + + # Then try to derive the estimator from the image name for 1P algorithms + image_name = training_details['AlgorithmSpecification']['TrainingImage'] + algorithm = image_name[image_name.find('/') + 1:image_name.find(':')] + if algorithm in AMAZON_ESTIMATOR_CLS_NAMES: + cls_name = AMAZON_ESTIMATOR_CLS_NAMES[algorithm] + return getattr(importlib.import_module(AMAZON_ESTIMATOR_MODULE), cls_name) + + # Default to the BYO estimator + return getattr(importlib.import_module(cls.DEFAULT_ESTIMATOR_MODULE), cls.DEFAULT_ESTIMATOR_CLS_NAME) + + @classmethod + def _prepare_estimator_from_job_description(cls, estimator_cls, training_details, sagemaker_session): + # Swap name for static hyperparameters to what an estimator would expect + training_details['HyperParameters'] = training_details['StaticHyperParameters'] + del training_details['StaticHyperParameters'] + + # Remove hyperparameter reserved by SageMaker for tuning jobs + del training_details['HyperParameters']['_tuning_objective_metric'] + + # Add items expected by the estimator (but aren't needed otherwise) + training_details['TrainingJobName'] = '' + if 'KmsKeyId' not in training_details['OutputDataConfig']: + training_details['OutputDataConfig']['KmsKeyId'] = '' + + estimator_init_params = estimator_cls._prepare_init_params_from_job_description(training_details) + return estimator_cls(sagemaker_session=sagemaker_session, **estimator_init_params) + + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + tuning_config = job_details['HyperParameterTuningJobConfig'] + return { + 'metric_definitions': job_details['TrainingJobDefinition']['AlgorithmSpecification']['MetricDefinitions'], + 'objective_metric_name': tuning_config['HyperParameterTuningJobObjective']['MetricName'], + 'objective_type': tuning_config['HyperParameterTuningJobObjective']['Type'], + 'hyperparameter_ranges': cls._prepare_parameter_ranges(tuning_config['ParameterRanges']), + 'strategy': tuning_config['Strategy'], + 'max_jobs': tuning_config['ResourceLimits']['MaxNumberOfTrainingJobs'], + 'max_parallel_jobs': tuning_config['ResourceLimits']['MaxParallelTrainingJobs'], + } + + @classmethod + def _prepare_parameter_ranges(cls, parameter_ranges): + ranges = {} + + for parameter in parameter_ranges['CategoricalParameterRanges']: + ranges[parameter['Name']] = CategoricalParameter(parameter['Values']) + + for parameter in parameter_ranges['ContinuousParameterRanges']: + ranges[parameter['Name']] = ContinuousParameter(float(parameter['MinValue']), float(parameter['MaxValue'])) + + for parameter in parameter_ranges['IntegerParameterRanges']: + ranges[parameter['Name']] = IntegerParameter(int(parameter['MinValue']), int(parameter['MaxValue'])) + + return ranges + + def hyperparameter_ranges(self): + """Return collections of ``ParameterRanges`` + + Returns: + dict: ParameterRanges suitable for a hyperparameter tuning job. + """ + hyperparameter_ranges = dict() + for range_type in _ParameterRange.__all_types__: + parameter_ranges = [] + for parameter_name, parameter in self._hyperparameter_ranges.items(): + if parameter is not None and parameter.__name__ == range_type: + # Categorical parameters needed to be serialized as JSON for our framework containers + if isinstance(parameter, CategoricalParameter) and isinstance(self.estimator, Framework): + tuning_range = parameter.as_json_range(parameter_name) + else: + tuning_range = parameter.as_tuning_range(parameter_name) + parameter_ranges.append(tuning_range) + hyperparameter_ranges[range_type + 'ParameterRanges'] = parameter_ranges + return hyperparameter_ranges + + @property + def sagemaker_session(self): + """The tuner shares the sagemaker_session object with its estimator. + Convenience method. + """ + return self.estimator.sagemaker_session + + def analytics(self): + """An instance of HyperparameterTuningJobAnalytics for this latest tuning job of this tuner. + Analytics olbject gives you access to tuning results summarized into a pandas dataframe. + """ + return HyperparameterTuningJobAnalytics(self.latest_tuning_job.name, self.sagemaker_session) + + def _validate_parameter_ranges(self): + for kls in inspect.getmro(self.estimator.__class__)[::-1]: + for attribute, value in kls.__dict__.items(): + if isinstance(value, hp): + try: + # The hyperparam names may not be the same as the class attribute that holds them, + # for instance: local_lloyd_init_method is called local_init_method. We need to map these + # and pass the correct name to the constructor. + parameter_range = self._hyperparameter_ranges[value.name] + + if isinstance(parameter_range, _ParameterRange): + for parameter_range_attribute, parameter_range_value in parameter_range.__dict__.items(): + # Categorical ranges + if isinstance(parameter_range_value, list): + for categorical_value in parameter_range_value: + value.validate(categorical_value) + # Continuous, Integer ranges + else: + value.validate(parameter_range_value) + except KeyError: + pass + + +class _TuningJob(_Job): + def __init__(self, sagemaker_session, tuning_job_name): + super(_TuningJob, self).__init__(sagemaker_session, tuning_job_name) + + @classmethod + def start_new(cls, tuner, inputs): + """Create a new Amazon SageMaker hyperparameter tuning job from the HyperparameterTuner. + + Args: + tuner (sagemaker.tuner.HyperparameterTuner): HyperparameterTuner object created by the user. + inputs (str): Parameters used when called :meth:`~sagemaker.estimator.EstimatorBase.fit`. + + Returns: + sagemaker.tuner._TuningJob: Constructed object that captures all information about the started job. + """ + config = _Job._load_config(inputs, tuner.estimator) + + tuner.estimator.sagemaker_session.tune(job_name=tuner._current_job_name, strategy=tuner.strategy, + objective_type=tuner.objective_type, + objective_metric_name=tuner.objective_metric_name, + max_jobs=tuner.max_jobs, max_parallel_jobs=tuner.max_parallel_jobs, + parameter_ranges=tuner.hyperparameter_ranges(), + static_hyperparameters=tuner.static_hyperparameters, + image=tuner.estimator.train_image(), + input_mode=tuner.estimator.input_mode, + metric_definitions=tuner.metric_definitions, + role=(config['role']), input_config=(config['input_config']), + output_config=(config['output_config']), + resource_config=(config['resource_config']), + stop_condition=(config['stop_condition']), tags=tuner.tags) + + return cls(tuner.sagemaker_session, tuner._current_job_name) + + def stop(self): + self.sagemaker_session.stop_tuning_job(name=self.name) + + def wait(self): + self.sagemaker_session.wait_for_tuning_job(self.name) diff --git a/src/sagemaker/utils.py b/src/sagemaker/utils.py index 0ff5ba45c6..a6534425c3 100644 --- a/src/sagemaker/utils.py +++ b/src/sagemaker/utils.py @@ -12,6 +12,7 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import sys import time import re @@ -31,20 +32,21 @@ def name_from_image(image): return name_from_base(base_name_from_image(image)) -def name_from_base(base): +def name_from_base(base, max_length=63, short=False): """Append a timestamp to the provided string. - The appended timestamp is precise to the millisecond. This function assures that the total length of the resulting - string is not longer that 63, trimming the input parameter if necessary. + This function assures that the total length of the resulting string is not + longer than the specified max length, trimming the input parameter if necessary. Args: base (str): String used as prefix to generate the unique name. + max_length (int): Maximum length for the resulting string. + short (bool): Whether or not to use a truncated timestamp. Returns: - str: Input parameter with appended timestamp (no longer than 63 characters). + str: Input parameter with appended timestamp. """ - max_length = 63 - timestamp = sagemaker_timestamp() + timestamp = sagemaker_short_timestamp() if short else sagemaker_timestamp() trimmed_base = base[:max_length - len(timestamp) - 1] return '{}-{}'.format(trimmed_base, timestamp) @@ -70,6 +72,11 @@ def sagemaker_timestamp(): return time.strftime("%Y-%m-%d-%H-%M-%S-{}".format(moment_ms), time.gmtime(moment)) +def sagemaker_short_timestamp(): + """Return a timestamp that is relatively short in length""" + return time.strftime('%y%m%d-%H%M') + + def debug(func): """Print the function name and arguments for debugging.""" @wraps(func) @@ -92,3 +99,40 @@ def get_config_value(key_path, config): return None return current_section + + +def to_str(value): + """Convert the input to a string, unless it is a unicode string in Python 2. + + Unicode strings are supported as native strings in Python 3, but ``str()`` cannot be + invoked on unicode strings in Python 2, so we need to check for that case when + converting user-specified values to strings. + + Args: + value: The value to convert to a string. + + Returns: + str or unicode: The string representation of the value or the unicode string itself. + """ + if sys.version_info.major < 3 and isinstance(value, unicode): # noqa: F821 + return value + else: + return str(value) + + +class DeferredError(object): + """Stores an exception and raises it at a later time anytime this + object is accessed in any way. Useful to allow soft-dependencies on imports, + so that the ImportError can be raised again later if code actually + relies on the missing library. + """ + + def __init__(self, exception): + self.exc = exception + + def __getattr__(self, name): + """Called by Python interpreter before using any method or property + on the object. So this will short-circuit essentially any access to this + object. + """ + raise self.exc diff --git a/tests/data/mxnet_mnist/tuning.py b/tests/data/mxnet_mnist/tuning.py new file mode 100644 index 0000000000..21cfb3d14d --- /dev/null +++ b/tests/data/mxnet_mnist/tuning.py @@ -0,0 +1,73 @@ +import gzip +import logging +import os +import struct + +import mxnet as mx +import numpy as np + + +def load_data(path): + with gzip.open(find_file(path, "labels.gz")) as flbl: + struct.unpack(">II", flbl.read(8)) + labels = np.fromstring(flbl.read(), dtype=np.int8) + with gzip.open(find_file(path, "images.gz")) as fimg: + _, _, rows, cols = struct.unpack(">IIII", fimg.read(16)) + images = np.fromstring(fimg.read(), dtype=np.uint8).reshape(len(labels), rows, cols) + images = images.reshape(images.shape[0], 1, 28, 28).astype(np.float32) / 255 + return labels, images + + +def find_file(root_path, file_name): + for root, dirs, files in os.walk(root_path): + if file_name in files: + return os.path.join(root, file_name) + + +def build_graph(): + data = mx.sym.var('data') + data = mx.sym.flatten(data=data) + fc1 = mx.sym.FullyConnected(data=data, num_hidden=128) + act1 = mx.sym.Activation(data=fc1, act_type="relu") + fc2 = mx.sym.FullyConnected(data=act1, num_hidden=64) + act2 = mx.sym.Activation(data=fc2, act_type="relu") + fc3 = mx.sym.FullyConnected(data=act2, num_hidden=10) + return mx.sym.SoftmaxOutput(data=fc3, name='softmax') + + +def train(current_host, channel_input_dirs, hyperparameters, hosts, num_cpus, num_gpus): + (train_labels, train_images) = load_data(os.path.join(channel_input_dirs['train'])) + (test_labels, test_images) = load_data(os.path.join(channel_input_dirs['test'])) + + # Alternatively to splitting in memory, the data could be pre-split in S3 and use ShardedByS3Key + # to do parallel training. + shard_size = len(train_images) // len(hosts) + for i, host in enumerate(hosts): + if host == current_host: + start = shard_size * i + end = start + shard_size + break + + batch_size = 100 + train_iter = mx.io.NDArrayIter(train_images[start:end], train_labels[start:end], batch_size, shuffle=True) + val_iter = mx.io.NDArrayIter(test_images, test_labels, batch_size) + logging.getLogger().setLevel(logging.DEBUG) + kvstore = 'local' if len(hosts) == 1 else 'dist_sync' + mlp_model = mx.mod.Module( + symbol=build_graph(), + context=get_train_context(num_cpus, num_gpus)) + mlp_model.fit(train_iter, + eval_data=val_iter, + kvstore=kvstore, + optimizer='sgd', + optimizer_params={'learning_rate': float(hyperparameters.get("learning_rate", 0.1))}, + eval_metric='acc', + batch_end_callback=mx.callback.Speedometer(batch_size, 100), + num_epoch=25) + return mlp_model + + +def get_train_context(num_cpus, num_gpus): + if num_gpus > 0: + return mx.gpu() + return mx.cpu() diff --git a/tests/integ/test_tuner.py b/tests/integ/test_tuner.py new file mode 100644 index 0000000000..417ca03b41 --- /dev/null +++ b/tests/integ/test_tuner.py @@ -0,0 +1,97 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import gzip +import os +import pickle +import sys + +import pytest + +from sagemaker.amazon.kmeans import KMeans +from sagemaker.mxnet.estimator import MXNet +from sagemaker.tuner import IntegerParameter, ContinuousParameter, CategoricalParameter, HyperparameterTuner +from tests.integ import DATA_DIR +from tests.integ.timeout import timeout + + +@pytest.mark.skip(reason='functionality is not ready yet') +def test_fit_1p(sagemaker_session): + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + kmeans = KMeans(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + k=10, sagemaker_session=sagemaker_session, base_job_name='tk', + output_path='s3://{}/'.format(sagemaker_session.default_bucket())) + + # set kmeans specific hp + kmeans.init_method = 'random' + kmeans.max_iterators = 1 + kmeans.tol = 1 + kmeans.num_trials = 1 + kmeans.local_init_method = 'kmeans++' + kmeans.half_life_time_size = 1 + kmeans.epochs = 1 + + records = kmeans.record_set(train_set[0][:100]) + test_records = kmeans.record_set(train_set[0][:100], channel='test') + + # specify which hp you want to optimize over + hyperparameter_ranges = {'extra_center_factor': IntegerParameter(1, 10), + 'mini_batch_size': IntegerParameter(10, 100), + 'epochs': IntegerParameter(1, 2), + 'init_method': CategoricalParameter(['kmeans++', 'random'])} + objective_metric_name = 'test:msd' + + tuner = HyperparameterTuner(estimator=kmeans, objective_metric_name=objective_metric_name, + hyperparameter_ranges=hyperparameter_ranges, objective_type='Minimize', max_jobs=2, + max_parallel_jobs=2) + + tuner.fit([records, test_records]) + + print('Started HPO job with name:' + tuner.latest_tuning_job.name) + + +@pytest.mark.skip(reason='functionality is not ready yet') +def test_mxnet_tuning(sagemaker_session, mxnet_full_version): + with timeout(minutes=15): + script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'tuning.py') + data_path = os.path.join(DATA_DIR, 'mxnet_mnist') + + estimator = MXNet(entry_point=script_path, + role='SageMakerRole', + framework_version=mxnet_full_version, + train_instance_count=1, + train_instance_type='ml.m4.xlarge', + sagemaker_session=sagemaker_session, + base_job_name='hpo') + + hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.01, 0.2)} + objective_metric_name = 'Validation-accuracy' + metric_definitions = [{'Name': 'Validation-accuracy', 'Regex': 'Validation-accuracy=([0-9\\.]+)'}] + tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, + max_jobs=4, max_parallel_jobs=2) + + train_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/mxnet_mnist/train') + test_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/mxnet_mnist/test') + tuner.fit({'train': train_input, 'test': test_input}) + + print('tuning job successfully created: {}'.format(tuner.latest_tuning_job.name)) diff --git a/tests/unit/test_amazon_estimator.py b/tests/unit/test_amazon_estimator.py index 4edc787ff0..07366e36c2 100644 --- a/tests/unit/test_amazon_estimator.py +++ b/tests/unit/test_amazon_estimator.py @@ -12,9 +12,9 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import numpy as np import pytest from mock import Mock, patch, call -import numpy as np # Use PCA as a test implementation of AmazonAlgorithmEstimator from sagemaker.amazon.pca import PCA @@ -98,6 +98,43 @@ def test_data_location_does_not_call_default_bucket(sagemaker_session): assert not sagemaker_session.default_bucket.called +def test_prepare_for_training(sagemaker_session): + pca = PCA(num_components=55, sagemaker_session=sagemaker_session, **COMMON_ARGS) + + train = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 8.0], [44.0, 55.0, 66.0]] + labels = [99, 85, 87, 2] + records = pca.record_set(np.array(train), np.array(labels)) + + pca._prepare_for_training(records, mini_batch_size=1) + assert pca.feature_dim == 3 + assert pca.mini_batch_size == 1 + + +def test_prepare_for_training_list(sagemaker_session): + pca = PCA(num_components=55, sagemaker_session=sagemaker_session, **COMMON_ARGS) + + train = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 8.0], [44.0, 55.0, 66.0]] + labels = [99, 85, 87, 2] + records = [pca.record_set(np.array(train), np.array(labels))] + + pca._prepare_for_training(records, mini_batch_size=1) + assert pca.feature_dim == 3 + assert pca.mini_batch_size == 1 + + +def test_prepare_for_training_list_no_train_channel(sagemaker_session): + pca = PCA(num_components=55, sagemaker_session=sagemaker_session, **COMMON_ARGS) + + train = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 8.0], [44.0, 55.0, 66.0]] + labels = [99, 85, 87, 2] + records = [pca.record_set(np.array(train), np.array(labels), 'test')] + + with pytest.raises(ValueError) as ex: + pca._prepare_for_training(records, mini_batch_size=1) + + assert 'Must provide train channel.' in str(ex) + + @patch('time.strftime', return_value=TIMESTAMP) def test_fit_ndarray(time, sagemaker_session): mock_s3 = Mock() diff --git a/tests/unit/test_analytics.py b/tests/unit/test_analytics.py new file mode 100644 index 0000000000..8909794178 --- /dev/null +++ b/tests/unit/test_analytics.py @@ -0,0 +1,200 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import datetime +import os +import uuid + +import pytest +from mock import Mock + +from sagemaker.analytics import AnalyticsMetricsBase, HyperparameterTuningJobAnalytics, TrainingJobAnalytics + +BUCKET_NAME = 'mybucket' +REGION = 'us-west-2' + + +@pytest.fixture() +def sagemaker_session(describe_training_result=None, list_training_results=None, metric_stats_results=None, + describe_tuning_result=None): + boto_mock = Mock(name='boto_session', region_name=REGION) + sms = Mock(name='sagemaker_session', boto_session=boto_mock, + boto_region_name=REGION, config=None, local_mode=False) + sms.default_bucket = Mock(name='default_bucket', return_value=BUCKET_NAME) + sms.sagemaker_client.describe_hyper_parameter_tuning_job = Mock(name='describe_hyper_parameter_tuning_job', + return_value=describe_tuning_result) + sms.sagemaker_client.describe_training_job = Mock(name='describe_training_job', + return_value=describe_training_result) + sms.sagemaker_client.list_training_jobs_for_hyper_parameter_tuning_job = Mock( + name='list_training_jobs_for_hyper_parameter_tuning_job', + return_value=list_training_results, + ) + cwm_mock = Mock(name='cloudwatch_client') + boto_mock.client = Mock(return_value=cwm_mock) + cwm_mock.get_metric_statistics = Mock( + name='get_metric_statistics', + return_value=metric_stats_results, + ) + return sms + + +def test_abstract_base_class(): + # confirm that the abstract base class can't be instantiated directly + with pytest.raises(TypeError) as _: # noqa: F841 + AnalyticsMetricsBase() + + +def test_tuner_name(sagemaker_session): + tuner = HyperparameterTuningJobAnalytics("my-tuning-job", sagemaker_session=sagemaker_session) + assert tuner.name == "my-tuning-job" + assert str(tuner).find("my-tuning-job") != -1 + + +def test_tuner_dataframe(): + def mock_summary(name="job-name", value=0.9): + return { + "TrainingJobName": name, + "TrainingJobStatus": "Completed", + "FinalHyperParameterTuningJobObjectiveMetric": { + "Name": "awesomeness", + "Value": value, + }, + "TrainingStartTime": datetime.datetime(2018, 5, 16, 1, 2, 3), + "TrainingEndTime": datetime.datetime(2018, 5, 16, 5, 6, 7), + "TunedHyperParameters": { + "learning_rate": 0.1, + "layers": 137, + }, + } + session = sagemaker_session(list_training_results={ + "TrainingJobSummaries": [ + mock_summary(), + mock_summary(), + mock_summary(), + mock_summary(), + mock_summary(), + ] + }) + tuner = HyperparameterTuningJobAnalytics("my-tuning-job", sagemaker_session=session) + df = tuner.dataframe() + assert df is not None + assert len(df) == 5 + assert len(session.sagemaker_client.list_training_jobs_for_hyper_parameter_tuning_job.mock_calls) == 1 + + # Clear the cache, check that it calls the service again. + tuner.clear_cache() + df = tuner.dataframe() + assert len(session.sagemaker_client.list_training_jobs_for_hyper_parameter_tuning_job.mock_calls) == 2 + df = tuner.dataframe(force_refresh=True) + assert len(session.sagemaker_client.list_training_jobs_for_hyper_parameter_tuning_job.mock_calls) == 3 + + # check that the hyperparameter is in the dataframe + assert len(df['layers']) == 5 + assert min(df['layers']) == 137 + + # Check that the training time calculation is returning something sane. + assert min(df['TrainingElapsedTimeSeconds']) > 5 + assert max(df['TrainingElapsedTimeSeconds']) < 86400 + + # Export to CSV and check that file exists + tmp_name = "/tmp/unit-test-%s.csv" % uuid.uuid4() + assert not os.path.isfile(tmp_name) + tuner.export_csv(tmp_name) + assert os.path.isfile(tmp_name) + os.unlink(tmp_name) + + +def test_description(): + session = sagemaker_session(describe_tuning_result={ + 'HyperParameterTuningJobConfig': { + 'ParameterRanges': { + 'CategoricalParameterRanges': [], + 'ContinuousParameterRanges': [ + {'MaxValue': '1', 'MinValue': '0', 'Name': 'eta'}, + {'MaxValue': '10', 'MinValue': '0', 'Name': 'gamma'}, + ], + 'IntegerParameterRanges': [ + {'MaxValue': '30', 'MinValue': '5', 'Name': 'num_layers'}, + {'MaxValue': '100', 'MinValue': '50', 'Name': 'iterations'}, + ], + }, + }, + }) + tuner = HyperparameterTuningJobAnalytics("my-tuning-job", sagemaker_session=session) + + d = tuner.description() + assert len(session.sagemaker_client.describe_hyper_parameter_tuning_job.mock_calls) == 1 + assert d is not None + assert d['HyperParameterTuningJobConfig'] is not None + tuner.clear_cache() + d = tuner.description() + assert len(session.sagemaker_client.describe_hyper_parameter_tuning_job.mock_calls) == 2 + d = tuner.description() + assert len(session.sagemaker_client.describe_hyper_parameter_tuning_job.mock_calls) == 2 + d = tuner.description(force_refresh=True) + assert len(session.sagemaker_client.describe_hyper_parameter_tuning_job.mock_calls) == 3 + + # Check that the ranges work. + r = tuner.tuning_ranges + assert len(r) == 4 + + +def test_trainer_name(): + describe_training_result = { + 'TrainingStartTime': datetime.datetime(2018, 5, 16, 1, 2, 3), + 'TrainingEndTime': datetime.datetime(2018, 5, 16, 5, 6, 7), + } + session = sagemaker_session(describe_training_result) + trainer = TrainingJobAnalytics("my-training-job", ["metric"], sagemaker_session=session) + assert trainer.name == "my-training-job" + assert str(trainer).find("my-training-job") != -1 + + +def test_trainer_dataframe(): + describe_training_result = { + 'TrainingStartTime': datetime.datetime(2018, 5, 16, 1, 2, 3), + 'TrainingEndTime': datetime.datetime(2018, 5, 16, 5, 6, 7), + } + metric_stats_results = { + 'Datapoints': [ + { + 'Average': 77.1, + 'Timestamp': datetime.datetime(2018, 5, 16, 1, 3, 3), + }, + { + 'Average': 87.1, + 'Timestamp': datetime.datetime(2018, 5, 16, 1, 8, 3), + }, + { + 'Average': 97.1, + 'Timestamp': datetime.datetime(2018, 5, 16, 2, 3, 3), + }, + ] + } + session = sagemaker_session(describe_training_result=describe_training_result, + metric_stats_results=metric_stats_results) + trainer = TrainingJobAnalytics("my-training-job", ["train:acc"], sagemaker_session=session) + + df = trainer.dataframe() + assert df is not None + assert len(df) == 3 + assert min(df['value']) == 77.1 + assert max(df['value']) == 97.1 + + # Export to CSV and check that file exists + tmp_name = "/tmp/unit-test-%s.csv" % uuid.uuid4() + assert not os.path.isfile(tmp_name) + trainer.export_csv(tmp_name) + assert os.path.isfile(tmp_name) + os.unlink(tmp_name) diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index 551a1b1386..c0de4c94ba 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -15,6 +15,7 @@ import logging import json import os + import pytest from mock import Mock, patch @@ -27,7 +28,6 @@ MODEL_IMAGE = "mi" ENTRY_POINT = "blah.py" - DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data') SCRIPT_NAME = 'dummy_script.py' SCRIPT_PATH = os.path.join(DATA_DIR, SCRIPT_NAME) @@ -40,20 +40,22 @@ REGION = 'us-west-2' JOB_NAME = '{}-{}'.format(IMAGE_NAME, TIMESTAMP) -COMMON_TRAIN_ARGS = {'volume_size': 30, - 'hyperparameters': { - 'sagemaker_program': 'dummy_script.py', - 'sagemaker_enable_cloudwatch_metrics': False, - 'sagemaker_container_log_level': logging.INFO, - }, - 'input_mode': 'File', - 'instance_type': 'c4.4xlarge', - 'inputs': 's3://mybucket/train', - 'instance_count': 1, - 'role': 'DummyRole', - 'kms_key_id': None, - 'max_run': 24, - 'wait': True} +COMMON_TRAIN_ARGS = { + 'volume_size': 30, + 'hyperparameters': { + 'sagemaker_program': 'dummy_script.py', + 'sagemaker_enable_cloudwatch_metrics': False, + 'sagemaker_container_log_level': logging.INFO, + }, + 'input_mode': 'File', + 'instance_type': 'c4.4xlarge', + 'inputs': 's3://mybucket/train', + 'instance_count': 1, + 'role': 'DummyRole', + 'kms_key_id': None, + 'max_run': 24, + 'wait': True, +} DESCRIBE_TRAINING_JOB_RESULT = { 'ModelArtifacts': { @@ -79,7 +81,6 @@ def _prepare_init_params_from_job_description(cls, job_details): class DummyFrameworkModel(FrameworkModel): - def __init__(self, sagemaker_session, **kwargs): super(DummyFrameworkModel, self).__init__(MODEL_DATA, MODEL_IMAGE, INSTANCE_TYPE, ROLE, ENTRY_POINT, sagemaker_session=sagemaker_session, **kwargs) @@ -238,28 +239,35 @@ def test_enable_cloudwatch_metrics(sagemaker_session): def test_attach_framework(sagemaker_session): - returned_job_description = {'AlgorithmSpecification': - {'TrainingInputMode': 'File', - 'TrainingImage': '1.dkr.ecr.us-west-2.amazonaws.com/sagemaker-other-py2-cpu:1.0.4'}, - 'HyperParameters': - {'sagemaker_submit_directory': '"s3://some/sourcedir.tar.gz"', - 'checkpoint_path': '"s3://other/1508872349"', - 'sagemaker_program': '"iris-dnn-classifier.py"', - 'sagemaker_enable_cloudwatch_metrics': 'false', - 'sagemaker_container_log_level': '"logging.INFO"', - 'sagemaker_job_name': '"neo"', - 'training_steps': '100'}, - 'RoleArn': 'arn:aws:iam::366:role/SageMakerRole', - 'ResourceConfig': - {'VolumeSizeInGB': 30, - 'InstanceCount': 1, - 'InstanceType': 'ml.c4.xlarge'}, - 'StoppingCondition': {'MaxRuntimeInSeconds': 24 * 60 * 60}, - 'TrainingJobName': 'neo', - 'TrainingJobStatus': 'Completed', - 'OutputDataConfig': {'KmsKeyId': '', - 'S3OutputPath': 's3://place/output/neo'}, - 'TrainingJobOutput': {'S3TrainingJobOutput': 's3://here/output.tar.gz'}} + returned_job_description = { + 'AlgorithmSpecification': { + 'TrainingInputMode': 'File', + 'TrainingImage': '1.dkr.ecr.us-west-2.amazonaws.com/sagemaker-other-py2-cpu:1.0.4', + }, + 'HyperParameters': { + 'sagemaker_submit_directory': '"s3://some/sourcedir.tar.gz"', + 'checkpoint_path': '"s3://other/1508872349"', + 'sagemaker_program': '"iris-dnn-classifier.py"', + 'sagemaker_enable_cloudwatch_metrics': 'false', + 'sagemaker_container_log_level': '"logging.INFO"', + 'sagemaker_job_name': '"neo"', + 'training_steps': '100', + }, + 'RoleArn': 'arn:aws:iam::366:role/SageMakerRole', + 'ResourceConfig': { + 'VolumeSizeInGB': 30, + 'InstanceCount': 1, + 'InstanceType': 'ml.c4.xlarge', + }, + 'StoppingCondition': {'MaxRuntimeInSeconds': 24 * 60 * 60}, + 'TrainingJobName': 'neo', + 'TrainingJobStatus': 'Completed', + 'OutputDataConfig': { + 'KmsKeyId': '', + 'S3OutputPath': 's3://place/output/neo', + }, + 'TrainingJobOutput': {'S3TrainingJobOutput': 's3://here/output.tar.gz'}, + } sagemaker_session.sagemaker_client.describe_training_job = Mock(name='describe_training_job', return_value=returned_job_description) @@ -277,17 +285,62 @@ def test_attach_framework(sagemaker_session): assert framework_estimator.entry_point == 'iris-dnn-classifier.py' -def test_fit_then_fit_again(sagemaker_session): - fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, - enable_cloudwatch_metrics=True) - fw.fit(inputs=s3_input('s3://mybucket/train')) - first_job_name = fw.latest_training_job.name +def test_attach_framework_with_tuning(sagemaker_session): + returned_job_description = { + 'AlgorithmSpecification': { + 'TrainingInputMode': 'File', + 'TrainingImage': '1.dkr.ecr.us-west-2.amazonaws.com/sagemaker-other-py2-cpu:1.0.4' + }, + 'HyperParameters': { + 'sagemaker_submit_directory': '"s3://some/sourcedir.tar.gz"', + 'checkpoint_path': '"s3://other/1508872349"', + 'sagemaker_program': '"iris-dnn-classifier.py"', + 'sagemaker_enable_cloudwatch_metrics': 'false', + 'sagemaker_container_log_level': '"logging.INFO"', + 'sagemaker_job_name': '"neo"', + 'training_steps': '100', + '_tuning_objective_metric': 'Validation-accuracy', + }, + + 'RoleArn': 'arn:aws:iam::366:role/SageMakerRole', + 'ResourceConfig': { + 'VolumeSizeInGB': 30, + 'InstanceCount': 1, + 'InstanceType': 'ml.c4.xlarge' + }, + 'StoppingCondition': { + 'MaxRuntimeInSeconds': 24 * 60 * 60 + }, + 'TrainingJobName': 'neo', + 'TrainingJobStatus': 'Completed', + 'OutputDataConfig': { + 'KmsKeyId': '', + 'S3OutputPath': 's3://place/output/neo' + }, + 'TrainingJobOutput': { + 'S3TrainingJobOutput': 's3://here/output.tar.gz' + } + } - fw.fit(inputs=s3_input('s3://mybucket/train2')) - second_job_name = fw.latest_training_job.name + mock_describe_training_job = Mock(name='describe_training_job', + return_value=returned_job_description) + sagemaker_session.sagemaker_client.describe_training_job = mock_describe_training_job - assert first_job_name != second_job_name + framework_estimator = DummyFramework.attach(training_job_name='neo', + sagemaker_session=sagemaker_session) + assert framework_estimator.latest_training_job.job_name == 'neo' + assert framework_estimator.role == 'arn:aws:iam::366:role/SageMakerRole' + assert framework_estimator.train_instance_count == 1 + assert framework_estimator.train_max_run == 24 * 60 * 60 + assert framework_estimator.input_mode == 'File' + assert framework_estimator.base_job_name == 'neo' + assert framework_estimator.output_path == 's3://place/output/neo' + assert framework_estimator.output_kms_key == '' + hyper_params = framework_estimator.hyperparameters() + assert hyper_params['training_steps'] == '100' + assert hyper_params['_tuning_objective_metric'] == '"Validation-accuracy"' + assert framework_estimator.source_dir == 's3://some/sourcedir.tar.gz' + assert framework_estimator.entry_point == 'iris-dnn-classifier.py' @patch('time.strftime', return_value=TIMESTAMP) @@ -308,57 +361,83 @@ def test_fit_verify_job_name(strftime, sagemaker_session): assert fw.latest_training_job.name == JOB_NAME -def test_fit_force_name(sagemaker_session): +def test_prepare_for_training_unique_job_name_generation(sagemaker_session): + fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, + train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, + enable_cloudwatch_metrics=True) + fw._prepare_for_training() + first_job_name = fw._current_job_name + + fw._prepare_for_training() + second_job_name = fw._current_job_name + + assert first_job_name != second_job_name + + +def test_prepare_for_training_force_name(sagemaker_session): fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, base_job_name='some', enable_cloudwatch_metrics=True) - fw.fit(inputs=s3_input('s3://mybucket/train'), job_name='use_it') - assert 'use_it' == fw.latest_training_job.name + fw._prepare_for_training(job_name='use_it') + assert 'use_it' == fw._current_job_name @patch('time.strftime', return_value=TIMESTAMP) -def test_fit_force_generation(strftime, sagemaker_session): +def test_prepare_for_training_force_name_generation(strftime, sagemaker_session): fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, base_job_name='some', enable_cloudwatch_metrics=True) fw.base_job_name = None - fw.fit(inputs=s3_input('s3://mybucket/train')) - assert JOB_NAME == fw.latest_training_job.name + fw._prepare_for_training() + assert JOB_NAME == fw._current_job_name @patch('time.strftime', return_value=TIMESTAMP) def test_init_with_source_dir_s3(strftime, sagemaker_session): - uri = 'bucket/mydata' - fw = DummyFramework(entry_point=SCRIPT_PATH, source_dir='s3://location', role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, enable_cloudwatch_metrics=False) - fw.fit('s3://{}'.format(uri)) + fw._prepare_for_training() + + expected_hyperparameters = { + 'sagemaker_program': SCRIPT_NAME, + 'sagemaker_job_name': JOB_NAME, + 'sagemaker_enable_cloudwatch_metrics': False, + 'sagemaker_container_log_level': logging.INFO, + 'sagemaker_submit_directory': 's3://location', + 'sagemaker_region': 'us-west-2', + } + assert fw._hyperparameters == expected_hyperparameters - expected_hyperparameters = BASE_HP.copy() - expected_hyperparameters['sagemaker_enable_cloudwatch_metrics'] = 'false' - expected_hyperparameters['sagemaker_container_log_level'] = str(logging.INFO) - expected_hyperparameters['sagemaker_submit_directory'] = json.dumps("s3://location") - expected_hyperparameters['sagemaker_region'] = '"us-west-2"' - actual_hyperparameter = sagemaker_session.method_calls[1][2]['hyperparameters'] - assert actual_hyperparameter == expected_hyperparameters +# _TrainingJob 'utils' +def test_start_new(sagemaker_session): + training_job = _TrainingJob(sagemaker_session, JOB_NAME) + hyperparameters = {'mock': 'hyperparameters'} + inputs = 's3://mybucket/train' + estimator = Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, INSTANCE_TYPE, + output_path='s3://bucket/prefix', sagemaker_session=sagemaker_session, + hyperparameters=hyperparameters) -# _TrainingJob 'utils' -def test_format_input_single_unamed_channel(): - input_dict = _TrainingJob._format_inputs_to_input_config('s3://blah/blah') - assert input_dict == [{ - 'ChannelName': 'training', - 'DataSource': { - 'S3DataSource': { - 'S3DataDistributionType': 'FullyReplicated', - 'S3DataType': 'S3Prefix', - 'S3Uri': 's3://blah/blah' - } - } - }] + started_training_job = training_job.start_new(estimator, inputs) + called_args = sagemaker_session.train.call_args + + assert started_training_job.sagemaker_session == sagemaker_session + assert called_args[1]['hyperparameters'] == hyperparameters + sagemaker_session.train.assert_called_once() + + +def test_start_new_not_local_mode_error(sagemaker_session): + training_job = _TrainingJob(sagemaker_session, JOB_NAME) + inputs = 'file://mybucket/train' + + estimator = Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, INSTANCE_TYPE, + output_path='s3://bucket/prefix', sagemaker_session=sagemaker_session) + with pytest.raises(ValueError) as error: + training_job.start_new(estimator, inputs) + assert 'File URIs are supported in local mode only. Please use a S3 URI instead.' == str(error) def test_container_log_level(sagemaker_session): @@ -371,82 +450,22 @@ def test_container_log_level(sagemaker_session): assert train_kwargs['hyperparameters']['sagemaker_container_log_level'] == '10' -def test_format_input_multiple_channels(): - input_list = _TrainingJob._format_inputs_to_input_config({'a': 's3://blah/blah', 'b': 's3://foo/bar'}) - expected = [{ - 'ChannelName': 'a', - 'DataSource': { - 'S3DataSource': { - 'S3DataDistributionType': 'FullyReplicated', - 'S3DataType': 'S3Prefix', - 'S3Uri': 's3://blah/blah' - } - } - }, - { - 'ChannelName': 'b', - 'DataSource': { - 'S3DataSource': { - 'S3DataDistributionType': 'FullyReplicated', - 'S3DataType': 'S3Prefix', - 'S3Uri': 's3://foo/bar' - } - } - }] +def test_wait_without_logs(sagemaker_session): + training_job = _TrainingJob(sagemaker_session, JOB_NAME) - # convert back into map for comparison so list order (which is arbitrary) is ignored - assert {c['ChannelName']: c for c in input_list} == {c['ChannelName']: c for c in expected} + training_job.wait(False) + sagemaker_session.wait_for_job.assert_called_once() + assert not sagemaker_session.logs_for_job.called -def test_format_input_s3_input(): - input_dict = _TrainingJob._format_inputs_to_input_config(s3_input('s3://foo/bar', distribution='ShardedByS3Key', - compression='gzip', content_type='whizz', - record_wrapping='bang')) - assert input_dict == [{ - 'CompressionType': 'gzip', - 'ChannelName': 'training', - 'ContentType': 'whizz', - 'DataSource': { - 'S3DataSource': { - 'S3DataType': 'S3Prefix', - 'S3DataDistributionType': 'ShardedByS3Key', - 'S3Uri': 's3://foo/bar'}}, - 'RecordWrapperType': 'bang'}] - - -def test_dict_of_mixed_input_types(): - input_list = _TrainingJob._format_inputs_to_input_config({ - 'a': 's3://foo/bar', - 'b': s3_input('s3://whizz/bang')}) - - expected = [ - {'ChannelName': 'a', - 'DataSource': { - 'S3DataSource': { - 'S3DataDistributionType': 'FullyReplicated', - 'S3DataType': 'S3Prefix', - 'S3Uri': 's3://foo/bar' - } - } - }, - { - 'ChannelName': 'b', - 'DataSource': { - 'S3DataSource': { - 'S3DataDistributionType': 'FullyReplicated', - 'S3DataType': 'S3Prefix', - 'S3Uri': 's3://whizz/bang' - } - } - }] - # convert back into map for comparison so list order (which is arbitrary) is ignored - assert {c['ChannelName']: c for c in input_list} == {c['ChannelName']: c for c in expected} +def test_wait_with_logs(sagemaker_session): + training_job = _TrainingJob(sagemaker_session, JOB_NAME) + training_job.wait() -def test_unsupported_type(): - with pytest.raises(ValueError): - _TrainingJob._format_inputs_to_input_config(55) + sagemaker_session.logs_for_job.assert_called_once() + assert not sagemaker_session.wait_for_job.called def test_unsupported_type_in_dict(): @@ -481,7 +500,6 @@ def test_unsupported_type_in_dict(): 'tags': None, } - HYPERPARAMS = {'x': 1, 'y': 'hello'} STRINGIFIED_HYPERPARAMS = dict([(x, str(y)) for x, y in HYPERPARAMS.items()]) HP_TRAIN_CALL = dict(BASE_TRAIN_CALL) diff --git a/tests/unit/test_fm.py b/tests/unit/test_fm.py index ad6a93c523..9dbb7c014c 100644 --- a/tests/unit/test_fm.py +++ b/tests/unit/test_fm.py @@ -210,31 +210,31 @@ def test_call_fit(base_fit, sagemaker_session): assert base_fit.call_args[0][1] == MINI_BATCH_SIZE -def test_call_fit_none_mini_batch_size(sagemaker_session): +def test_prepare_for_training_no_mini_batch_size(sagemaker_session): fm = FactorizationMachines(base_job_name='fm', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') - fm.fit(data) + fm._prepare_for_training(data) -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): fm = FactorizationMachines(base_job_name='fm', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises((TypeError, ValueError)): - fm.fit(data, 'some') + fm._prepare_for_training(data, 'some') -def test_call_fit_wrong_value_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_value_mini_batch_size(sagemaker_session): fm = FactorizationMachines(base_job_name='fm', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - fm.fit(data, 0) + fm._prepare_for_training(data, 0) def test_model_image(sagemaker_session): diff --git a/tests/unit/test_hyperparameter.py b/tests/unit/test_hyperparameter.py index 4b07b7efee..ecccee254a 100644 --- a/tests/unit/test_hyperparameter.py +++ b/tests/unit/test_hyperparameter.py @@ -18,9 +18,9 @@ class Test(object): - blank = Hyperparameter(name="some-name", data_type=int) + blank = Hyperparameter(name='some-name', data_type=int) elizabeth = Hyperparameter(name='elizabeth') - validated = Hyperparameter(name="validated", validate=lambda value: value > 55, data_type=int) + validated = Hyperparameter(name='validated', validate=lambda value: value > 55, data_type=int) def test_blank_access(): @@ -62,7 +62,7 @@ def test_validated(): def test_data_type(): x = Test() x.validated = 66 - assert type(x.validated) == Test.__dict__["validated"].data_type + assert type(x.validated) == Test.__dict__['validated'].data_type def test_from_string(): diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py new file mode 100644 index 0000000000..4692641fb4 --- /dev/null +++ b/tests/unit/test_job.py @@ -0,0 +1,302 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import pytest +from mock import Mock + +from sagemaker.amazon.amazon_estimator import RecordSet +from sagemaker.estimator import Estimator +from sagemaker.job import _Job +from sagemaker.session import s3_input + +BUCKET_NAME = 's3://mybucket/train' +S3_OUTPUT_PATH = 's3://bucket/prefix' +LOCAL_FILE_NAME = 'file://local/file' +INSTANCE_COUNT = 1 +INSTANCE_TYPE = 'c4.4xlarge' +VOLUME_SIZE = 1 +MAX_RUNTIME = 1 +ROLE = 'DummyRole' +IMAGE_NAME = 'fakeimage' +JOB_NAME = 'fakejob' + + +@pytest.fixture() +def estimator(sagemaker_session): + return Estimator(IMAGE_NAME, ROLE, INSTANCE_COUNT, INSTANCE_TYPE, VOLUME_SIZE, MAX_RUNTIME, + output_path=S3_OUTPUT_PATH, sagemaker_session=sagemaker_session) + + +@pytest.fixture() +def sagemaker_session(): + boto_mock = Mock(name='boto_session') + mock_session = Mock(name='sagemaker_session', boto_session=boto_mock) + mock_session.expand_role = Mock(name='expand_role', return_value=ROLE) + + return mock_session + + +def test_load_config(estimator): + inputs = s3_input(BUCKET_NAME) + + config = _Job._load_config(inputs, estimator) + + assert config['input_config'][0]['DataSource']['S3DataSource']['S3Uri'] == BUCKET_NAME + assert config['role'] == ROLE + assert config['output_config']['S3OutputPath'] == S3_OUTPUT_PATH + assert 'KmsKeyId' not in config['output_config'] + assert config['resource_config']['InstanceCount'] == INSTANCE_COUNT + assert config['resource_config']['InstanceType'] == INSTANCE_TYPE + assert config['resource_config']['VolumeSizeInGB'] == VOLUME_SIZE + assert config['stop_condition']['MaxRuntimeInSeconds'] == MAX_RUNTIME + + +def test_format_inputs_to_input_config_string(): + inputs = BUCKET_NAME + + channels = _Job._format_inputs_to_input_config(inputs) + + assert channels[0]['DataSource']['S3DataSource']['S3Uri'] == inputs + + +def test_format_inputs_to_input_config_s3_input(): + inputs = s3_input(BUCKET_NAME) + + channels = _Job._format_inputs_to_input_config(inputs) + + assert channels[0]['DataSource']['S3DataSource']['S3Uri'] == inputs.config['DataSource'][ + 'S3DataSource']['S3Uri'] + + +def test_format_inputs_to_input_config_dict(): + inputs = {'train': BUCKET_NAME} + + channels = _Job._format_inputs_to_input_config(inputs) + + assert channels[0]['DataSource']['S3DataSource']['S3Uri'] == inputs['train'] + + +def test_format_inputs_to_input_config_record_set(): + inputs = RecordSet(s3_data=BUCKET_NAME, num_records=1, feature_dim=1) + + channels = _Job._format_inputs_to_input_config(inputs) + + assert channels[0]['DataSource']['S3DataSource']['S3Uri'] == inputs.s3_data + assert channels[0]['DataSource']['S3DataSource']['S3DataType'] == inputs.s3_data_type + + +def test_format_inputs_to_input_config_list(): + records = RecordSet(s3_data=BUCKET_NAME, num_records=1, feature_dim=1) + inputs = [records] + + channels = _Job._format_inputs_to_input_config(inputs) + + assert channels[0]['DataSource']['S3DataSource']['S3Uri'] == records.s3_data + assert channels[0]['DataSource']['S3DataSource']['S3DataType'] == records.s3_data_type + + +def test_format_inputs_to_input_config_list_not_all_records(): + records = RecordSet(s3_data=BUCKET_NAME, num_records=1, feature_dim=1) + inputs = [records, 'mock'] + + with pytest.raises(ValueError) as ex: + _Job._format_inputs_to_input_config(inputs) + + assert 'List compatible only with RecordSets.' in str(ex) + + +def test_format_inputs_to_input_config_list_duplicate_channel(): + record = RecordSet(s3_data=BUCKET_NAME, num_records=1, feature_dim=1) + inputs = [record, record] + + with pytest.raises(ValueError) as ex: + _Job._format_inputs_to_input_config(inputs) + + assert 'Duplicate channels not allowed.' in str(ex) + + +def test_format_input_single_unamed_channel(): + input_dict = _Job._format_inputs_to_input_config('s3://blah/blah') + assert input_dict == [{ + 'ChannelName': 'training', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://blah/blah' + } + } + }] + + +def test_format_input_multiple_channels(): + input_list = _Job._format_inputs_to_input_config({'a': 's3://blah/blah', 'b': 's3://foo/bar'}) + expected = [{ + 'ChannelName': 'a', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://blah/blah' + } + } + }, + { + 'ChannelName': 'b', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://foo/bar' + } + } + }] + + # convert back into map for comparison so list order (which is arbitrary) is ignored + assert {c['ChannelName']: c for c in input_list} == {c['ChannelName']: c for c in expected} + + +def test_format_input_s3_input(): + input_dict = _Job._format_inputs_to_input_config(s3_input('s3://foo/bar', distribution='ShardedByS3Key', + compression='gzip', content_type='whizz', + record_wrapping='bang')) + assert input_dict == [{ + 'CompressionType': 'gzip', + 'ChannelName': 'training', + 'ContentType': 'whizz', + 'DataSource': { + 'S3DataSource': { + 'S3DataType': 'S3Prefix', + 'S3DataDistributionType': 'ShardedByS3Key', + 'S3Uri': 's3://foo/bar'}}, + 'RecordWrapperType': 'bang'}] + + +def test_dict_of_mixed_input_types(): + input_list = _Job._format_inputs_to_input_config({ + 'a': 's3://foo/bar', + 'b': s3_input('s3://whizz/bang')}) + + expected = [ + {'ChannelName': 'a', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://foo/bar' + } + } + }, + { + 'ChannelName': 'b', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri': 's3://whizz/bang' + } + } + }] + + # convert back into map for comparison so list order (which is arbitrary) is ignored + assert {c['ChannelName']: c for c in input_list} == {c['ChannelName']: c for c in expected} + + +def test_format_inputs_to_input_config_exception(): + inputs = 1 + + with pytest.raises(ValueError): + _Job._format_inputs_to_input_config(inputs) + + +def test_unsupported_type_in_dict(): + with pytest.raises(ValueError): + _Job._format_inputs_to_input_config({'a': 66}) + + +def test_format_string_uri_input_string(): + inputs = BUCKET_NAME + + s3_uri_input = _Job._format_string_uri_input(inputs) + + assert s3_uri_input.config['DataSource']['S3DataSource']['S3Uri'] == inputs + + +def test_format_string_uri_input_string_exception(): + inputs = 'mybucket/train' + + with pytest.raises(ValueError): + _Job._format_string_uri_input(inputs) + + +def test_format_string_uri_input_local_file(): + file_uri_input = _Job._format_string_uri_input(LOCAL_FILE_NAME) + + assert file_uri_input.config['DataSource']['FileDataSource']['FileUri'] == LOCAL_FILE_NAME + + +def test_format_string_uri_input(): + inputs = s3_input(BUCKET_NAME) + + s3_uri_input = _Job._format_string_uri_input(inputs) + + assert s3_uri_input.config['DataSource']['S3DataSource']['S3Uri'] == inputs.config[ + 'DataSource']['S3DataSource']['S3Uri'] + + +def test_format_string_uri_input_exception(): + inputs = 1 + + with pytest.raises(ValueError): + _Job._format_string_uri_input(inputs) + + +def test_prepare_output_config(): + kms_key_id = 'kms_key' + + config = _Job._prepare_output_config(BUCKET_NAME, kms_key_id) + + assert config['S3OutputPath'] == BUCKET_NAME + assert config['KmsKeyId'] == kms_key_id + + +def test_prepare_output_config_kms_key_none(): + s3_path = BUCKET_NAME + kms_key_id = None + + config = _Job._prepare_output_config(s3_path, kms_key_id) + + assert config['S3OutputPath'] == s3_path + assert 'KmsKeyId' not in config + + +def test_prepare_resource_config(): + resource_config = _Job._prepare_resource_config(INSTANCE_COUNT, INSTANCE_TYPE, VOLUME_SIZE) + + assert resource_config['InstanceCount'] == INSTANCE_COUNT + assert resource_config['InstanceType'] == INSTANCE_TYPE + assert resource_config['VolumeSizeInGB'] == VOLUME_SIZE + + +def test_prepare_stop_condition(): + max_run = 1 + + stop_condition = _Job._prepare_stop_condition(max_run) + + assert stop_condition['MaxRuntimeInSeconds'] == max_run + + +def test_name(sagemaker_session): + job = _Job(sagemaker_session, JOB_NAME) + assert job.name == JOB_NAME diff --git a/tests/unit/test_kmeans.py b/tests/unit/test_kmeans.py index d6019e168a..a5b829fa60 100644 --- a/tests/unit/test_kmeans.py +++ b/tests/unit/test_kmeans.py @@ -175,31 +175,33 @@ def test_call_fit(base_fit, sagemaker_session): assert base_fit.call_args[0][1] == MINI_BATCH_SIZE -def test_call_fit_none_mini_batch_size(sagemaker_session): +def test_prepare_for_training_no_mini_batch_size(sagemaker_session): kmeans = KMeans(base_job_name='kmeans', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') - kmeans.fit(data) + kmeans._prepare_for_training(data) + assert kmeans.mini_batch_size == 5000 -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): + +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): kmeans = KMeans(base_job_name='kmeans', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises((TypeError, ValueError)): - kmeans.fit(data, 'some') + kmeans._prepare_for_training(data, 'some') -def test_call_fit_wrong_value_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_value_mini_batch_size(sagemaker_session): kmeans = KMeans(base_job_name='kmeans', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - kmeans.fit(data, 0) + kmeans._prepare_for_training(data, 0) def test_model_image(sagemaker_session): diff --git a/tests/unit/test_lda.py b/tests/unit/test_lda.py index 34e2d0bd8b..5c0528e322 100644 --- a/tests/unit/test_lda.py +++ b/tests/unit/test_lda.py @@ -147,32 +147,32 @@ def test_call_fit(base_fit, sagemaker_session): assert base_fit.call_args[0][1] == MINI_BATCH_SZIE -def test_call_fit_none_mini_batch_size(sagemaker_session): +def test_prepare_for_training_no_mini_batch_size(sagemaker_session): lda = LDA(base_job_name='lda', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - lda.fit(data, None) + lda._prepare_for_training(data, None) -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): lda = LDA(base_job_name='lda', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - lda.fit(data, 'some') + lda._prepare_for_training(data, 'some') -def test_call_fit_wrong_value_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_value_mini_batch_size(sagemaker_session): lda = LDA(base_job_name='lda', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - lda.fit(data, 0) + lda._prepare_for_training(data, 0) def test_model_image(sagemaker_session): diff --git a/tests/unit/test_linear_learner.py b/tests/unit/test_linear_learner.py index bdc18c43ac..7724d5fe54 100644 --- a/tests/unit/test_linear_learner.py +++ b/tests/unit/test_linear_learner.py @@ -218,22 +218,17 @@ def test_optional_hyper_parameters_value(sagemaker_session, optional_hyper_param DEFAULT_MINI_BATCH_SIZE = 1000 -@patch('sagemaker.amazon.amazon_estimator.AmazonAlgorithmEstimatorBase.fit') -def test_call_fit_calculate_batch_size_1(base_fit, sagemaker_session): +def test_prepare_for_training_calculate_batch_size_1(sagemaker_session): lr = LinearLearner(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') - lr.fit(data) + lr._prepare_for_training(data) - base_fit.assert_called_once() - assert len(base_fit.call_args[0]) == 2 - assert base_fit.call_args[0][0] == data - assert base_fit.call_args[0][1] == 1 + assert lr.mini_batch_size == 1 -@patch('sagemaker.amazon.amazon_estimator.AmazonAlgorithmEstimatorBase.fit') -def test_call_fit_calculate_batch_size_2(base_fit, sagemaker_session): +def test_prepare_for_training_calculate_batch_size_2(sagemaker_session): lr = LinearLearner(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), @@ -241,12 +236,36 @@ def test_call_fit_calculate_batch_size_2(base_fit, sagemaker_session): feature_dim=FEATURE_DIM, channel='train') - lr.fit(data) + lr._prepare_for_training(data) - base_fit.assert_called_once() - assert len(base_fit.call_args[0]) == 2 - assert base_fit.call_args[0][0] == data - assert base_fit.call_args[0][1] == DEFAULT_MINI_BATCH_SIZE + assert lr.mini_batch_size == DEFAULT_MINI_BATCH_SIZE + + +def test_prepare_for_training_multiple_channel(sagemaker_session): + lr = LinearLearner(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) + + data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), + num_records=10000, + feature_dim=FEATURE_DIM, + channel='train') + + lr._prepare_for_training([data, data]) + + assert lr.mini_batch_size == DEFAULT_MINI_BATCH_SIZE + + +def test_prepare_for_training_multiple_channel_no_train(sagemaker_session): + lr = LinearLearner(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) + + data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), + num_records=10000, + feature_dim=FEATURE_DIM, + channel='mock') + + with pytest.raises(ValueError) as ex: + lr._prepare_for_training([data, data]) + + assert 'Must provide train channel.' in str(ex) @patch('sagemaker.amazon.amazon_estimator.AmazonAlgorithmEstimatorBase.fit') diff --git a/tests/unit/test_ntm.py b/tests/unit/test_ntm.py index ceff3e51e3..1f3866968b 100644 --- a/tests/unit/test_ntm.py +++ b/tests/unit/test_ntm.py @@ -193,32 +193,32 @@ def test_call_fit_none_mini_batch_size(sagemaker_session): ntm.fit(data) -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): ntm = NTM(base_job_name="ntm", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet("s3://{}/{}".format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises((TypeError, ValueError)): - ntm.fit(data, "some") + ntm._prepare_for_training(data, "some") -def test_call_fit_wrong_value_lower_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_value_lower_mini_batch_size(sagemaker_session): ntm = NTM(base_job_name="ntm", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet("s3://{}/{}".format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - ntm.fit(data, 0) + ntm._prepare_for_training(data, 0) -def test_call_fit_wrong_value_upper_mini_batch_size(sagemaker_session): +def test_prepare_for_training_wrong_value_upper_mini_batch_size(sagemaker_session): ntm = NTM(base_job_name="ntm", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet("s3://{}/{}".format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') with pytest.raises(ValueError): - ntm.fit(data, 10001) + ntm._prepare_for_training(data, 10001) def test_model_image(sagemaker_session): diff --git a/tests/unit/test_pca.py b/tests/unit/test_pca.py index b70a7b556a..c72cd4fedd 100644 --- a/tests/unit/test_pca.py +++ b/tests/unit/test_pca.py @@ -143,15 +143,17 @@ def test_call_fit(base_fit, sagemaker_session): assert base_fit.call_args[0][1] == MINI_BATCH_SIZE -def test_call_fit_none_mini_batch_size(sagemaker_session): +def test_prepare_for_training_no_mini_batch_size(sagemaker_session): pca = PCA(base_job_name='pca', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') - pca.fit(data) + pca._prepare_for_training(data) + assert pca.mini_batch_size == 1 -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): + +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): pca = PCA(base_job_name='pca', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, @@ -161,6 +163,29 @@ def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): pca.fit(data, 'some') +def test_prepare_for_training_multiple_channel(sagemaker_session): + lr = PCA(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) + + data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, + channel='train') + + lr._prepare_for_training([data, data]) + + assert lr.mini_batch_size == 1 + + +def test_prepare_for_training_multiple_channel_no_train(sagemaker_session): + lr = PCA(base_job_name='lr', sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) + + data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, + channel='mock') + + with pytest.raises(ValueError) as ex: + lr._prepare_for_training([data, data]) + + assert 'Must provide train channel.' in str(ex) + + def test_model_image(sagemaker_session): pca = PCA(sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet('s3://{}/{}'.format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') diff --git a/tests/unit/test_randomcutforest.py b/tests/unit/test_randomcutforest.py index 98306abf73..d265d22ba6 100644 --- a/tests/unit/test_randomcutforest.py +++ b/tests/unit/test_randomcutforest.py @@ -141,16 +141,18 @@ def test_call_fit(base_fit, sagemaker_session): assert base_fit.call_args[0][1] == MINI_BATCH_SIZE -def test_call_fit_none_mini_batch_size(sagemaker_session): +def test_prepare_for_training_no_mini_batch_size(sagemaker_session): randomcutforest = RandomCutForest(base_job_name="randomcutforest", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) data = RecordSet("s3://{}/{}".format(BUCKET_NAME, PREFIX), num_records=1, feature_dim=FEATURE_DIM, channel='train') - randomcutforest.fit(data) + randomcutforest._prepare_for_training(data) + assert randomcutforest.mini_batch_size == MINI_BATCH_SIZE -def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): + +def test_prepare_for_training_wrong_type_mini_batch_size(sagemaker_session): randomcutforest = RandomCutForest(base_job_name="randomcutforest", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) @@ -158,10 +160,10 @@ def test_call_fit_wrong_type_mini_batch_size(sagemaker_session): channel='train') with pytest.raises((TypeError, ValueError)): - randomcutforest.fit(data, 1234) + randomcutforest._prepare_for_training(data, 1234) -def test_call_fit_feature_dim_greater_than_max_allowed(sagemaker_session): +def test_prepare_for_training_feature_dim_greater_than_max_allowed(sagemaker_session): randomcutforest = RandomCutForest(base_job_name="randomcutforest", sagemaker_session=sagemaker_session, **ALL_REQ_ARGS) @@ -169,7 +171,7 @@ def test_call_fit_feature_dim_greater_than_max_allowed(sagemaker_session): channel='train') with pytest.raises((TypeError, ValueError)): - randomcutforest.fit(data) + randomcutforest._prepare_for_training(data) def test_model_image(sagemaker_session): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 8f7edf61dc..3be8b24e21 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -22,6 +22,8 @@ from botocore.exceptions import ClientError +from sagemaker.session import _tuning_job_status + REGION = 'us-west-2' @@ -229,6 +231,40 @@ def test_train_pack_to_request(sagemaker_session): 'create_training_job', (), DEFAULT_EXPECTED_TRAIN_JOB_ARGS) +def test_stop_tuning_job(sagemaker_session): + sms = sagemaker_session + sms.sagemaker_client.stop_hyper_parameter_tuning_job = Mock(name='stop_hyper_parameter_tuning_job') + + sagemaker_session.stop_tuning_job(JOB_NAME) + sms.sagemaker_client.stop_hyper_parameter_tuning_job.assert_called_once_with(HyperParameterTuningJobName=JOB_NAME) + + +def test_stop_tuning_job_client_error_already_stopped(sagemaker_session): + sms = sagemaker_session + exception = ClientError({'Error': {'Code': 'ValidationException'}}, 'Operation') + sms.sagemaker_client.stop_hyper_parameter_tuning_job = Mock(name='stop_hyper_parameter_tuning_job', + side_effect=exception) + sagemaker_session.stop_tuning_job(JOB_NAME) + + sms.sagemaker_client.stop_hyper_parameter_tuning_job.assert_called_once_with(HyperParameterTuningJobName=JOB_NAME) + + +def test_stop_tuning_job_client_error(sagemaker_session): + error_response = {'Error': {'Code': 'MockException', 'Message': 'MockMessage'}} + operation = 'Operation' + exception = ClientError(error_response, operation) + + sms = sagemaker_session + sms.sagemaker_client.stop_hyper_parameter_tuning_job = Mock(name='stop_hyper_parameter_tuning_job', + side_effect=exception) + + with pytest.raises(ClientError) as e: + sagemaker_session.stop_tuning_job(JOB_NAME) + + sms.sagemaker_client.stop_hyper_parameter_tuning_job.assert_called_once_with(HyperParameterTuningJobName=JOB_NAME) + assert 'An error occurred (MockException) when calling the Operation operation: MockMessage' in str(e) + + def test_train_pack_to_request_with_optional_params(sagemaker_session): in_config = [{ 'ChannelName': 'training', @@ -449,3 +485,32 @@ def test_endpoint_from_production_variants(sagemaker_session): 'InitialVariantWeight': 1, 'InitialInstanceCount': 1, 'VariantName': 'AllTraffic'}]) + + +def test_wait_for_tuning_job(sagemaker_session): + hyperparameter_tuning_job_desc = {'HyperParameterTuningJobStatus': 'Completed'} + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=hyperparameter_tuning_job_desc) + + result = sagemaker_session.wait_for_tuning_job(JOB_NAME) + assert result['HyperParameterTuningJobStatus'] == 'Completed' + + +def test_tune_job_status(sagemaker_session): + hyperparameter_tuning_job_desc = {'HyperParameterTuningJobStatus': 'Completed'} + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=hyperparameter_tuning_job_desc) + + result = _tuning_job_status(sagemaker_session.sagemaker_client, JOB_NAME) + + assert result['HyperParameterTuningJobStatus'] == 'Completed' + + +def test_tune_job_status_none(sagemaker_session): + hyperparameter_tuning_job_desc = {'HyperParameterTuningJobStatus': 'InProgress'} + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=hyperparameter_tuning_job_desc) + + result = _tuning_job_status(sagemaker_session.sagemaker_client, JOB_NAME) + + assert result is None diff --git a/tests/unit/test_tuner.py b/tests/unit/test_tuner.py new file mode 100644 index 0000000000..a1f2d21515 --- /dev/null +++ b/tests/unit/test_tuner.py @@ -0,0 +1,548 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import copy +import json + +import pytest +from mock import Mock + +from sagemaker import RealTimePredictor +from sagemaker.amazon.pca import PCA +from sagemaker.amazon.amazon_estimator import RecordSet +from sagemaker.estimator import Estimator +from sagemaker.tuner import _ParameterRange, ContinuousParameter, IntegerParameter, CategoricalParameter, \ + HyperparameterTuner, _TuningJob +from sagemaker.mxnet import MXNet +MODEL_DATA = "s3://bucket/model.tar.gz" + +JOB_NAME = 'tuning_job' +REGION = 'us-west-2' +BUCKET_NAME = 'Some-Bucket' +ROLE = 'myrole' +IMAGE_NAME = 'image' + +TRAIN_INSTANCE_COUNT = 1 +TRAIN_INSTANCE_TYPE = 'ml.c4.xlarge' +NUM_COMPONENTS = 5 + +SCRIPT_NAME = 'my_script.py' +FRAMEWORK_VERSION = '1.0.0' + +INPUTS = 's3://mybucket/train' +OBJECTIVE_METRIC_NAME = 'mock_metric' +HYPERPARAMETER_RANGES = {'validated': ContinuousParameter(0, 5), + 'elizabeth': IntegerParameter(0, 5), + 'blank': CategoricalParameter([0, 5])} +METRIC_DEFINTIONS = 'mock_metric_definitions' + +TUNING_JOB_DETAILS = { + 'HyperParameterTuningJobConfig': { + 'ResourceLimits': { + 'MaxParallelTrainingJobs': 1, + 'MaxNumberOfTrainingJobs': 1 + }, + 'HyperParameterTuningJobObjective': { + 'MetricName': OBJECTIVE_METRIC_NAME, + 'Type': 'Minimize' + }, + 'Strategy': 'Bayesian', + 'ParameterRanges': { + 'CategoricalParameterRanges': [], + 'ContinuousParameterRanges': [], + 'IntegerParameterRanges': [ + { + 'MaxValue': '100', + 'Name': 'mini_batch_size', + 'MinValue': '10', + }, + ] + } + }, + 'HyperParameterTuningJobName': JOB_NAME, + 'TrainingJobDefinition': { + 'RoleArn': ROLE, + 'StaticHyperParameters': { + 'num_components': '1', + '_tuning_objective_metric': 'train:throughput', + 'feature_dim': '784', + 'sagemaker_estimator_module': '"sagemaker.amazon.pca"', + 'sagemaker_estimator_class_name': '"PCA"', + }, + 'ResourceConfig': { + 'VolumeSizeInGB': 30, + 'InstanceType': 'ml.c4.xlarge', + 'InstanceCount': 1 + }, + 'AlgorithmSpecification': { + 'TrainingImage': IMAGE_NAME, + 'TrainingInputMode': 'File', + 'MetricDefinitions': METRIC_DEFINTIONS, + }, + 'InputDataConfig': [ + { + 'ChannelName': 'train', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'ShardedByS3Key', + 'S3Uri': INPUTS, + 'S3DataType': 'ManifestFile' + } + } + } + ], + 'StoppingCondition': { + 'MaxRuntimeInSeconds': 86400 + }, + 'OutputDataConfig': { + 'S3OutputPath': BUCKET_NAME, + } + }, + 'TrainingJobCounters': { + 'ClientError': 0, + 'Completed': 1, + 'InProgress': 0, + 'Fault': 0, + 'Stopped': 0 + }, + 'HyperParameterTuningEndTime': 1526605831.0, + 'CreationTime': 1526605605.0, + 'HyperParameterTuningJobArn': 'arn:tuning_job', +} + + +@pytest.fixture() +def sagemaker_session(): + boto_mock = Mock(name='boto_session', region_name=REGION) + sms = Mock(name='sagemaker_session', boto_session=boto_mock) + sms.boto_region_name = REGION + sms.default_bucket = Mock(name='default_bucket', return_value=BUCKET_NAME) + sms.config = None + return sms + + +@pytest.fixture() +def estimator(sagemaker_session): + return Estimator(IMAGE_NAME, ROLE, TRAIN_INSTANCE_COUNT, TRAIN_INSTANCE_TYPE, output_path='s3://bucket/prefix', + sagemaker_session=sagemaker_session) + + +@pytest.fixture() +def tuner(estimator): + return HyperparameterTuner(estimator, OBJECTIVE_METRIC_NAME, HYPERPARAMETER_RANGES, METRIC_DEFINTIONS) + + +def test_prepare_for_training(tuner): + static_hyperparameters = {'validated': 1, 'another_one': 0} + tuner.estimator.set_hyperparameters(**static_hyperparameters) + tuner._prepare_for_training() + + assert tuner._current_job_name.startswith(IMAGE_NAME) + + assert len(tuner.static_hyperparameters) == 3 + assert tuner.static_hyperparameters['another_one'] == '0' + + class_name = json.dumps(tuner.estimator.__class__.__name__) + assert tuner.static_hyperparameters['sagemaker_estimator_class_name'] == class_name + module = json.dumps(tuner.estimator.__module__) + assert tuner.static_hyperparameters['sagemaker_estimator_module'] == module + + +def test_prepare_for_training_with_job_name(tuner): + static_hyperparameters = {'validated': 1, 'another_one': 0} + tuner.estimator.set_hyperparameters(**static_hyperparameters) + + tuner._prepare_for_training(job_name='some-other-job-name') + assert tuner._current_job_name == 'some-other-job-name' + + +def test_validate_parameter_ranges_number_validation_error(sagemaker_session): + pca = PCA(ROLE, TRAIN_INSTANCE_COUNT, TRAIN_INSTANCE_TYPE, NUM_COMPONENTS, + base_job_name='pca', sagemaker_session=sagemaker_session) + + invalid_hyperparameter_ranges = {'num_components': IntegerParameter(-1, 2)} + + with pytest.raises(ValueError) as e: + HyperparameterTuner(estimator=pca, objective_metric_name=OBJECTIVE_METRIC_NAME, + hyperparameter_ranges=invalid_hyperparameter_ranges, metric_definitions=METRIC_DEFINTIONS) + + assert 'Value must be an integer greater than zero' in str(e) + + +def test_validate_parameter_ranges_string_value_validation_error(sagemaker_session): + pca = PCA(ROLE, TRAIN_INSTANCE_COUNT, TRAIN_INSTANCE_TYPE, NUM_COMPONENTS, + base_job_name='pca', sagemaker_session=sagemaker_session) + + invalid_hyperparameter_ranges = {'algorithm_mode': CategoricalParameter([0, 5])} + + with pytest.raises(ValueError) as e: + HyperparameterTuner(estimator=pca, objective_metric_name=OBJECTIVE_METRIC_NAME, + hyperparameter_ranges=invalid_hyperparameter_ranges, metric_definitions=METRIC_DEFINTIONS) + + assert 'Value must be one of "regular" and "randomized"' in str(e) + + +def test_fit_pca(sagemaker_session, tuner): + pca = PCA(ROLE, TRAIN_INSTANCE_COUNT, TRAIN_INSTANCE_TYPE, NUM_COMPONENTS, + base_job_name='pca', sagemaker_session=sagemaker_session) + + pca.algorithm_mode = 'randomized' + pca.subtract_mean = True + pca.extra_components = 5 + + tuner.estimator = pca + + tags = [{'Name': 'some-tag-without-a-value'}] + tuner.tags = tags + + hyperparameter_ranges = {'num_components': IntegerParameter(2, 4), + 'algorithm_mode': CategoricalParameter(['regular', 'randomized'])} + tuner._hyperparameter_ranges = hyperparameter_ranges + + records = RecordSet(s3_data=INPUTS, num_records=1, feature_dim=1) + tuner.fit(records, mini_batch_size=9999) + + _, _, tune_kwargs = sagemaker_session.tune.mock_calls[0] + + assert len(tune_kwargs['static_hyperparameters']) == 4 + assert tune_kwargs['static_hyperparameters']['extra_components'] == '5' + assert len(tune_kwargs['parameter_ranges']['IntegerParameterRanges']) == 1 + assert tune_kwargs['job_name'].startswith('pca') + assert tune_kwargs['tags'] == tags + assert tuner.estimator.mini_batch_size == 9999 + + +def test_attach_tuning_job_with_estimator_from_hyperparameters(sagemaker_session): + job_details = copy.deepcopy(TUNING_JOB_DETAILS) + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock(name='describe_tuning_job', + return_value=job_details) + tuner = HyperparameterTuner.attach(JOB_NAME, sagemaker_session=sagemaker_session) + + assert tuner.latest_tuning_job.name == JOB_NAME + assert tuner.objective_metric_name == OBJECTIVE_METRIC_NAME + assert tuner.max_jobs == 1 + assert tuner.max_parallel_jobs == 1 + assert tuner.metric_definitions == METRIC_DEFINTIONS + assert tuner.strategy == 'Bayesian' + assert tuner.objective_type == 'Minimize' + + assert isinstance(tuner.estimator, PCA) + assert tuner.estimator.role == ROLE + assert tuner.estimator.train_instance_count == 1 + assert tuner.estimator.train_max_run == 24 * 60 * 60 + assert tuner.estimator.input_mode == 'File' + assert tuner.estimator.output_path == BUCKET_NAME + assert tuner.estimator.output_kms_key == '' + + assert '_tuning_objective_metric' not in tuner.estimator.hyperparameters() + assert tuner.estimator.hyperparameters()['num_components'] == '1' + + +def test_attach_tuning_job_with_job_details(sagemaker_session): + job_details = copy.deepcopy(TUNING_JOB_DETAILS) + HyperparameterTuner.attach(JOB_NAME, sagemaker_session=sagemaker_session, job_details=job_details) + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job.assert_not_called + + +def test_attach_tuning_job_with_estimator_from_image(sagemaker_session): + job_details = copy.deepcopy(TUNING_JOB_DETAILS) + job_details['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = '1111.amazonaws.com/pca:1' + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock(name='describe_tuning_job', + return_value=job_details) + + tuner = HyperparameterTuner.attach(JOB_NAME, sagemaker_session=sagemaker_session) + assert isinstance(tuner.estimator, PCA) + + +def test_attach_tuning_job_with_estimator_from_kwarg(sagemaker_session): + job_details = copy.deepcopy(TUNING_JOB_DETAILS) + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock(name='describe_tuning_job', + return_value=job_details) + tuner = HyperparameterTuner.attach(JOB_NAME, sagemaker_session=sagemaker_session, + estimator_cls='sagemaker.estimator.Estimator') + assert isinstance(tuner.estimator, Estimator) + + +def test_attach_with_no_specified_estimator(sagemaker_session): + job_details = copy.deepcopy(TUNING_JOB_DETAILS) + del job_details['TrainingJobDefinition']['StaticHyperParameters']['sagemaker_estimator_module'] + del job_details['TrainingJobDefinition']['StaticHyperParameters']['sagemaker_estimator_class_name'] + sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock(name='describe_tuning_job', + return_value=job_details) + + tuner = HyperparameterTuner.attach(JOB_NAME, sagemaker_session=sagemaker_session) + assert isinstance(tuner.estimator, Estimator) + + +def test_serialize_parameter_ranges(tuner): + hyperparameter_ranges = tuner.hyperparameter_ranges() + + for key, value in HYPERPARAMETER_RANGES.items(): + assert hyperparameter_ranges[value.__name__ + 'ParameterRanges'][0]['Name'] == key + + +def test_analytics(tuner): + tuner.latest_tuning_job = _TuningJob(tuner.sagemaker_session, 'testjob') + tuner_analytics = tuner.analytics() + assert tuner_analytics is not None + assert tuner_analytics.name.find('testjob') > -1 + + +def test_serialize_categorical_ranges_for_frameworks(sagemaker_session, tuner): + tuner.estimator = MXNet(entry_point=SCRIPT_NAME, + role=ROLE, + framework_version=FRAMEWORK_VERSION, + train_instance_count=TRAIN_INSTANCE_COUNT, + train_instance_type=TRAIN_INSTANCE_TYPE, + sagemaker_session=sagemaker_session) + + hyperparameter_ranges = tuner.hyperparameter_ranges() + + assert hyperparameter_ranges['CategoricalParameterRanges'][0]['Name'] == 'blank' + assert hyperparameter_ranges['CategoricalParameterRanges'][0]['Values'] == ['"0"', '"5"'] + + +def test_serialize_nonexistent_parameter_ranges(tuner): + temp_hyperparameter_ranges = HYPERPARAMETER_RANGES.copy() + parameter_type = temp_hyperparameter_ranges['validated'].__name__ + + temp_hyperparameter_ranges['validated'] = None + tuner._hyperparameter_ranges = temp_hyperparameter_ranges + + ranges = tuner.hyperparameter_ranges() + assert len(ranges.keys()) == 3 + assert not ranges[parameter_type + 'ParameterRanges'] + + +def test_stop_tuning_job(sagemaker_session, tuner): + sagemaker_session.stop_tuning_job = Mock(name='stop_hyper_parameter_tuning_job') + tuner.latest_tuning_job = _TuningJob(sagemaker_session, JOB_NAME) + + tuner.stop_tuning_job() + + sagemaker_session.stop_tuning_job.assert_called_once_with(name=JOB_NAME) + + +def test_stop_tuning_job_no_tuning_job(tuner): + with pytest.raises(ValueError) as e: + tuner.stop_tuning_job() + assert 'No tuning job available' in str(e) + + +def test_best_tuning_job(tuner): + tuning_job_description = {'BestTrainingJob': {'TrainingJobName': JOB_NAME}} + + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=tuning_job_description) + + tuner.latest_tuning_job = _TuningJob(tuner.estimator.sagemaker_session, JOB_NAME) + best_training_job = tuner.best_training_job() + + assert best_training_job == JOB_NAME + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job.assert_called_once_with( + HyperParameterTuningJobName=JOB_NAME) + + +def test_best_tuning_job_no_latest_job(tuner): + with pytest.raises(Exception) as e: + tuner.best_training_job() + + assert 'No tuning job available' in str(e) + + +def test_best_tuning_job_no_best_job(tuner): + tuning_job_description = {'BestTrainingJob': {'Mock': None}} + + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=tuning_job_description) + + tuner.latest_tuning_job = _TuningJob(tuner.estimator.sagemaker_session, JOB_NAME) + + with pytest.raises(Exception) as e: + tuner.best_training_job() + + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job.assert_called_once_with( + HyperParameterTuningJobName=JOB_NAME) + assert 'Best training job not available for tuning job:' in str(e) + + +def test_deploy_default(tuner): + returned_training_job_description = { + 'AlgorithmSpecification': { + 'TrainingInputMode': 'File', + 'TrainingImage': IMAGE_NAME + }, + 'HyperParameters': { + 'sagemaker_submit_directory': '"s3://some/sourcedir.tar.gz"', + 'checkpoint_path': '"s3://other/1508872349"', + 'sagemaker_program': '"iris-dnn-classifier.py"', + 'sagemaker_enable_cloudwatch_metrics': 'false', + 'sagemaker_container_log_level': '"logging.INFO"', + 'sagemaker_job_name': '"neo"', + 'training_steps': '100', + '_tuning_objective_metric': 'Validation-accuracy', + }, + + 'RoleArn': ROLE, + 'ResourceConfig': { + 'VolumeSizeInGB': 30, + 'InstanceCount': 1, + 'InstanceType': 'ml.c4.xlarge' + }, + 'StoppingCondition': { + 'MaxRuntimeInSeconds': 24 * 60 * 60 + }, + 'TrainingJobName': 'neo', + 'TrainingJobStatus': 'Completed', + 'OutputDataConfig': { + 'KmsKeyId': '', + 'S3OutputPath': 's3://place/output/neo' + }, + 'TrainingJobOutput': { + 'S3TrainingJobOutput': 's3://here/output.tar.gz' + }, + 'ModelArtifacts': { + 'S3ModelArtifacts': MODEL_DATA + } + } + tuning_job_description = {'BestTrainingJob': {'TrainingJobName': JOB_NAME}} + + tuner.estimator.sagemaker_session.sagemaker_client.describe_training_job = \ + Mock(name='describe_training_job', return_value=returned_training_job_description) + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=tuning_job_description) + tuner.estimator.sagemaker_session.log_for_jobs = Mock(name='log_for_jobs') + + tuner.latest_tuning_job = _TuningJob(tuner.estimator.sagemaker_session, JOB_NAME) + predictor = tuner.deploy(TRAIN_INSTANCE_COUNT, TRAIN_INSTANCE_TYPE) + + tuner.estimator.sagemaker_session.create_model.assert_called_once() + args = tuner.estimator.sagemaker_session.create_model.call_args[0] + assert args[0].startswith(IMAGE_NAME) + assert args[1] == ROLE + assert args[2]['Image'] == IMAGE_NAME + assert args[2]['ModelDataUrl'] == MODEL_DATA + + assert isinstance(predictor, RealTimePredictor) + assert predictor.endpoint.startswith(JOB_NAME) + assert predictor.sagemaker_session == tuner.estimator.sagemaker_session + + +def test_wait(tuner): + tuner.latest_tuning_job = _TuningJob(tuner.estimator.sagemaker_session, JOB_NAME) + tuner.estimator.sagemaker_session.wait_for_tuning_job = Mock(name='wait_for_tuning_job') + + tuner.wait() + + tuner.estimator.sagemaker_session.wait_for_tuning_job.assert_called_once_with(JOB_NAME) + + +def test_delete_endpoint(tuner): + tuner.latest_tuning_job = _TuningJob(tuner.estimator.sagemaker_session, JOB_NAME) + + tuning_job_description = {'BestTrainingJob': {'TrainingJobName': JOB_NAME}} + tuner.estimator.sagemaker_session.sagemaker_client.describe_hyper_parameter_tuning_job = Mock( + name='describe_hyper_parameter_tuning_job', return_value=tuning_job_description) + + tuner.delete_endpoint() + tuner.sagemaker_session.delete_endpoint.assert_called_with(JOB_NAME) + + +################################################################################# +# _ParameterRange Tests + +def test_continuous_parameter(): + cont_param = ContinuousParameter(0.1, 1e-2) + assert isinstance(cont_param, _ParameterRange) + assert cont_param.__name__ is 'Continuous' + + +def test_continuous_parameter_ranges(): + cont_param = ContinuousParameter(0.1, 1e-2) + ranges = cont_param.as_tuning_range('some') + assert len(ranges.keys()) == 3 + assert ranges['Name'] == 'some' + assert ranges['MinValue'] == '0.1' + assert ranges['MaxValue'] == '0.01' + + +def test_integer_parameter(): + int_param = IntegerParameter(1, 2) + assert isinstance(int_param, _ParameterRange) + assert int_param.__name__ is 'Integer' + + +def test_integer_parameter_ranges(): + int_param = IntegerParameter(1, 2) + ranges = int_param.as_tuning_range('some') + assert len(ranges.keys()) == 3 + assert ranges['Name'] == 'some' + assert ranges['MinValue'] == '1' + assert ranges['MaxValue'] == '2' + + +def test_categorical_parameter_list(): + cat_param = CategoricalParameter(['a', 'z']) + assert isinstance(cat_param, _ParameterRange) + assert cat_param.__name__ is 'Categorical' + + +def test_categorical_parameter_list_ranges(): + cat_param = CategoricalParameter([1, 10]) + ranges = cat_param.as_tuning_range('some') + assert len(ranges.keys()) == 2 + assert ranges['Name'] == 'some' + assert ranges['Values'] == ['1', '10'] + + +def test_categorical_parameter_value(): + cat_param = CategoricalParameter('a') + assert isinstance(cat_param, _ParameterRange) + + +def test_categorical_parameter_value_ranges(): + cat_param = CategoricalParameter('a') + ranges = cat_param.as_tuning_range('some') + assert len(ranges.keys()) == 2 + assert ranges['Name'] == 'some' + assert ranges['Values'] == ['a'] + + +################################################################################# +# _TuningJob Tests + +def test_start_new(tuner, sagemaker_session): + tuning_job = _TuningJob(sagemaker_session, JOB_NAME) + + tuner.static_hyperparameters = {} + started_tuning_job = tuning_job.start_new(tuner, INPUTS) + + assert started_tuning_job.sagemaker_session == sagemaker_session + sagemaker_session.tune.assert_called_once() + + +def test_stop(sagemaker_session): + tuning_job = _TuningJob(sagemaker_session, JOB_NAME) + tuning_job.stop() + + sagemaker_session.stop_tuning_job.assert_called_once_with(name=JOB_NAME) + + +def test_tuning_job_wait(sagemaker_session): + sagemaker_session.wait_for_tuning_job = Mock(name='wait_for_tuning_job') + + tuning_job = _TuningJob(sagemaker_session, JOB_NAME) + tuning_job.wait() + + sagemaker_session.wait_for_tuning_job.assert_called_once_with(JOB_NAME) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 5004529a08..de40983895 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You @@ -12,7 +14,12 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -from sagemaker.utils import get_config_value +import pytest +from mock import patch + +from sagemaker.utils import get_config_value, name_from_base, to_str, DeferredError + +NAME = 'base_name' def test_get_config_value(): @@ -32,3 +39,41 @@ def test_get_config_value(): assert get_config_value('does_not.exist', config) is None assert get_config_value('other.key', None) is None + + +def test_deferred_error(): + de = DeferredError(ImportError("pretend the import failed")) + with pytest.raises(ImportError) as _: # noqa: F841 + de.something() + + +def test_bad_import(): + try: + import pandas_is_not_installed as pd + except ImportError as e: + pd = DeferredError(e) + assert pd is not None + with pytest.raises(ImportError) as _: # noqa: F841 + pd.DataFrame() + + +@patch('sagemaker.utils.sagemaker_timestamp') +def test_name_from_base(sagemaker_timestamp): + name_from_base(NAME, short=False) + assert sagemaker_timestamp.called_once + + +@patch('sagemaker.utils.sagemaker_short_timestamp') +def test_name_from_base_short(sagemaker_short_timestamp): + name_from_base(NAME, short=True) + assert sagemaker_short_timestamp.called_once + + +def test_to_str_with_native_string(): + value = 'some string' + assert to_str(value) == value + + +def test_to_str_with_unicode_string(): + value = u'åñøthér strîng' + assert to_str(value) == value diff --git a/tox.ini b/tox.ini index c49dbe1193..8ddd405f8a 100644 --- a/tox.ini +++ b/tox.ini @@ -62,6 +62,7 @@ deps = pytest-xdist tensorflow mock + pandas contextlib2 teamcity-messages awslogs