diff --git a/README.rst b/README.rst index e06dea8b3c..c80ef02d4b 100644 --- a/README.rst +++ b/README.rst @@ -97,7 +97,7 @@ SageMaker Python SDK provides several high-level abstractions for working with A - **Estimators**: Encapsulate training on SageMaker. Can be ``fit()`` to run training, then the resulting model ``deploy()`` ed to a SageMaker Endpoint. - **Models**: Encapsulate built ML models. Can be ``deploy()`` ed to a SageMaker Endpoint. - **Predictors**: Provide real-time inference and transformation using Python data-types against a SageMaker Endpoint. -- **Session**: Provides a collection of convience methods for working with SageMaker resources. +- **Session**: Provides a collection of convenience methods for working with SageMaker resources. Estimator and Model implementations for MXNet, TensorFlow, and Amazon ML algorithms are included. There's also an Estimator that runs SageMaker compatible custom Docker containers, allowing you to run your own ML algorithms via SageMaker Python SDK. @@ -1150,6 +1150,7 @@ Optional arguments - ``wait (bool)``: Defaults to True, whether to block and wait for the training script to complete before returning. + If set to False, it will return immediately, and can later be attached to. - ``logs (bool)``: Defaults to True, whether to show logs produced by training job in the Python session. Only meaningful when wait is True. - ``run_tensorboard_locally (bool)``: Defaults to False. Executes TensorBoard in a different @@ -1178,9 +1179,25 @@ the ``TensorFlow`` estimator parameter ``training_steps`` is finished or when th job execution time reaches the ``TensorFlow`` estimator parameter ``train_max_run``. When the training job finishes, a `TensorFlow serving `_ -with the result of the training is generated and saved to the S3 location define by +with the result of the training is generated and saved to the S3 location defined by the ``TensorFlow`` estimator parameter ``output_path``. + +If the ``wait=False`` flag is passed to ``fit``, then it will return immediately. The training job will continue running +asynchronously. At a later time, a Tensorflow Estimator can be obtained by attaching to the existing training job. If +the training job is not finished it will start showing the standard output of training and wait until it completes. +After attaching, the estimator can be deployed as usual. + +.. code:: python + + tf_estimator.fit(your_input_data, wait=False) + training_job_name = tf_estimator.latest_training_job.name + + # after some time, or in a separate python notebook, we can attach to it again. + + tf_estimator = TensorFlow.attach(training_job_name=training_job_name) + + The evaluation process """""""""""""""""""""" @@ -1244,6 +1261,8 @@ You can access TensorBoard locally at http://localhost:6006 or using your SakeMa `https*workspace_base_url*proxy/6006/ `_ (TensorBoard will not work if you forget to put the slash, '/', in end of the url). If TensorBoard started on a different port, adjust these URLs to match. +Note that TensorBoard is not supported when passing wait=False to ``fit``. + Deploying TensorFlow Serving models ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index cc92fec0f4..27538edf59 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -65,6 +65,31 @@ def data_location(self, data_location): data_location = data_location + '/' self._data_location = data_location + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor + + Args: + job_details: the returned job details from a describe_training_job API call. + + Returns: + dictionary: The transformed init_params + + """ + init_params = super(AmazonAlgorithmEstimatorBase, cls)._prepare_init_params_from_job_description(job_details) + + # The hyperparam names may not be the same as the class attribute that holds them, + # for instance: local_lloyd_init_method is called local_init_method. We need to map these + # and pass the correct name to the constructor. + for attribute, value in cls.__dict__.items(): + if isinstance(value, hp): + if value.name in init_params['hyperparameters']: + init_params[attribute] = init_params['hyperparameters'][value.name] + + del init_params['hyperparameters'] + del init_params['image'] + return init_params + def fit(self, records, mini_batch_size=None, **kwargs): """Fit this Estimator on serialized Record objects, stored in S3. diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 2bfce13f59..c672703315 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -152,8 +152,60 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): self.latest_training_job = _TrainingJob.start_new(self, inputs) if wait: self.latest_training_job.wait(logs=logs) - else: - raise NotImplemented('Asynchronous fit not available') + + @classmethod + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ + raise NotImplementedError() + + @classmethod + def attach(cls, training_job_name, sagemaker_session=None, job_details=None): + """Attach to an existing training job. + + Create an Estimator bound to an existing training job, each subclass is responsible to implement + ``_prepare_init_params_from_job_description()`` as this method delegates the actual conversion of a training + job description to the arguments that the class constructor expects. After attaching, if the training job has a + Complete status, it can be ``deploy()`` ed to create a SageMaker Endpoint and return a ``Predictor``. + + If the training job is in progress, attach will block and display log messages + from the training job, until the training job completes. + + Args: + training_job_name (str): The name of the training job to attach to. + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with + Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one + using the default AWS configuration chain. + + Examples: + >>> my_estimator.fit(wait=False) + >>> training_job_name = my_estimator.latest_training_job.name + Later on: + >>> attached_estimator = Estimator.attach(training_job_name) + >>> attached_estimator.deploy() + + Returns: + Instance of the calling ``Estimator`` Class with the attached training job. + """ + sagemaker_session = sagemaker_session or Session() + + job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) + init_params = cls._prepare_init_params_from_job_description(job_details) + + estimator = cls(sagemaker_session=sagemaker_session, **init_params) + estimator.latest_training_job = _TrainingJob(sagemaker_session=sagemaker_session, + training_job_name=init_params['base_job_name']) + estimator.latest_training_job.wait() + return estimator def deploy(self, initial_instance_count, instance_type, endpoint_name=None, **kwargs): """Deploy the trained model to an Amazon SageMaker endpoint and return a ``sagemaker.RealTimePredictor`` object. @@ -202,21 +254,33 @@ def create_model(self, **kwargs): """ pass - @staticmethod - def _prepare_estimator_params_from_job_description(job_details): - estimator_params = dict() + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor + + Args: + job_details: the returned job details from a describe_training_job API call. + + Returns: + dictionary: The transformed init_params + + """ + init_params = dict() - estimator_params['role'] = job_details['RoleArn'] - estimator_params['train_instance_count'] = job_details['ResourceConfig']['InstanceCount'] - estimator_params['train_instance_type'] = job_details['ResourceConfig']['InstanceType'] - estimator_params['train_volume_size'] = job_details['ResourceConfig']['VolumeSizeInGB'] - estimator_params['train_max_run'] = job_details['StoppingCondition']['MaxRuntimeInSeconds'] - estimator_params['input_mode'] = job_details['AlgorithmSpecification']['TrainingInputMode'] - estimator_params['base_job_name'] = job_details['TrainingJobName'] - estimator_params['output_path'] = job_details['OutputDataConfig']['S3OutputPath'] - estimator_params['output_kms_key'] = job_details['OutputDataConfig']['KmsKeyId'] + init_params['role'] = job_details['RoleArn'] + init_params['train_instance_count'] = job_details['ResourceConfig']['InstanceCount'] + init_params['train_instance_type'] = job_details['ResourceConfig']['InstanceType'] + init_params['train_volume_size'] = job_details['ResourceConfig']['VolumeSizeInGB'] + init_params['train_max_run'] = job_details['StoppingCondition']['MaxRuntimeInSeconds'] + init_params['input_mode'] = job_details['AlgorithmSpecification']['TrainingInputMode'] + init_params['base_job_name'] = job_details['TrainingJobName'] + init_params['output_path'] = job_details['OutputDataConfig']['S3OutputPath'] + init_params['output_kms_key'] = job_details['OutputDataConfig']['KmsKeyId'] - return estimator_params, job_details['HyperParameters'], job_details['AlgorithmSpecification']['TrainingImage'] + init_params['hyperparameters'] = job_details['HyperParameters'] + init_params['image'] = job_details['AlgorithmSpecification']['TrainingImage'] + + return init_params def delete_endpoint(self): """Delete an Amazon SageMaker ``Endpoint``. @@ -333,7 +397,8 @@ class Estimator(EstimatorBase): def __init__(self, image_name, role, train_instance_count, train_instance_type, train_volume_size=30, train_max_run=24 * 60 * 60, input_mode='File', - output_path=None, output_kms_key=None, base_job_name=None, sagemaker_session=None): + output_path=None, output_kms_key=None, base_job_name=None, sagemaker_session=None, + hyperparameters=None): """Initialize an ``Estimator`` instance. Args: @@ -365,9 +430,10 @@ def __init__(self, image_name, role, train_instance_count, train_instance_type, sagemaker_session (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one using the default AWS configuration chain. + hyperparameters (dict): Dictionary containing the hyperparameters to initialize this estimator with. """ self.image_name = image_name - self.hyperparam_dict = {} + self.hyperparam_dict = hyperparameters.copy() if hyperparameters else {} super(Estimator, self).__init__(role, train_instance_count, train_instance_type, train_volume_size, train_max_run, input_mode, output_path, output_kms_key, base_job_name, sagemaker_session) @@ -422,6 +488,22 @@ def predict_wrapper(endpoint, session): return Model(self.model_data, image or self.train_image(), self.role, sagemaker_session=self.sagemaker_session, predictor_cls=predictor_cls, **kwargs) + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor + + Args: + job_details: the returned job details from a describe_training_job API call. + + Returns: + dictionary: The transformed init_params + + """ + init_params = super(Estimator, cls)._prepare_init_params_from_job_description(job_details) + + init_params['image_name'] = init_params.pop('image') + return init_params + class Framework(EstimatorBase): """Base class that cannot be instantiated directly. @@ -528,12 +610,37 @@ def hyperparameters(self): return self._json_encode_hyperparameters(self._hyperparameters) @classmethod - def attach(cls, training_job_name, sagemaker_session=None, **kwargs): + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor + + Args: + job_details: the returned job details from a describe_training_job API call. + + Returns: + dictionary: The transformed init_params + + """ + init_params = super(Framework, cls)._prepare_init_params_from_job_description(job_details) + + init_params['entry_point'] = json.loads(init_params['hyperparameters'].get(SCRIPT_PARAM_NAME)) + init_params['source_dir'] = json.loads(init_params['hyperparameters'].get(DIR_PARAM_NAME)) + init_params['enable_cloudwatch_metrics'] = json.loads( + init_params['hyperparameters'].get(CLOUDWATCH_METRICS_PARAM_NAME)) + init_params['container_log_level'] = json.loads( + init_params['hyperparameters'].get(CONTAINER_LOG_LEVEL_PARAM_NAME)) + + init_params['hyperparameters'] = {k: json.loads(v) for k, v in init_params['hyperparameters'].items()} + + return init_params + + @classmethod + def attach(cls, training_job_name, sagemaker_session=None): """Attach to an existing training job. - Create an Estimator bound to an existing training job. After attaching, if - the training job has a Complete status, it can be ``deploy()`` ed to create - a SageMaker Endpoint and return a ``Predictor``. + Create an Estimator bound to an existing training job, each subclass is responsible to implement + ``_prepare_init_params_from_job_description()`` as this method delegates the actual conversion of a training + job description to the arguments that the class constructor expects. After attaching, if the training job has a + Complete status, it can be ``deploy()`` ed to create a SageMaker Endpoint and return a ``Predictor``. If the training job is in progress, attach will block and display log messages from the training job, until the training job completes. @@ -543,41 +650,18 @@ def attach(cls, training_job_name, sagemaker_session=None, **kwargs): sagemaker_session (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one using the default AWS configuration chain. - **kwargs: Additional kwargs passed to the :class:`~sagemaker.estimator.Estimator` constructor. + + Examples: + >>> my_estimator.fit(wait=False) + >>> training_job_name = my_estimator.latest_training_job.name + Later on: + >>> attached_estimator = Estimator.attach(training_job_name) + >>> attached_estimator.deploy() Returns: - sagemaker.estimator.Framework: ``Estimator`` with the attached training job. + Instance of the calling ``Estimator`` Class with the attached training job. """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is not None: - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, _ = cls._prepare_estimator_params_from_job_description(job_details) - - else: - # this case is only valid when called from inheriting class and then the class must declare framework - if not hasattr(cls, '__framework_name__'): - raise ValueError('must specify training_job name') - init_params = dict(kwargs) - hp = init_params.pop('hyperparameters') - - # parameters for framework classes - framework_init_params = dict() - framework_init_params['entry_point'] = json.loads(hp.get(SCRIPT_PARAM_NAME)) - framework_init_params['source_dir'] = json.loads(hp.get(DIR_PARAM_NAME)) - framework_init_params['enable_cloudwatch_metrics'] = json.loads(hp.get(CLOUDWATCH_METRICS_PARAM_NAME)) - framework_init_params['container_log_level'] = json.loads(hp.get(CONTAINER_LOG_LEVEL_PARAM_NAME)) - - # drop json and remove other SageMaker specific additions - hyperparameters = {entry: json.loads(hp[entry]) for entry in hp} - framework_init_params['hyperparameters'] = hyperparameters - - init_params.update(framework_init_params) - - estimator = cls(sagemaker_session=sagemaker_session, **init_params) - estimator.latest_training_job = _TrainingJob(sagemaker_session=sagemaker_session, - training_job_name=init_params['base_job_name']) - estimator.latest_training_job.wait() + estimator = super(Framework, cls).attach(training_job_name, sagemaker_session) estimator.uploaded_code = UploadedCode(estimator.source_dir, estimator.entry_point) return estimator diff --git a/src/sagemaker/mxnet/estimator.py b/src/sagemaker/mxnet/estimator.py index b41975345c..2793089c0f 100644 --- a/src/sagemaker/mxnet/estimator.py +++ b/src/sagemaker/mxnet/estimator.py @@ -14,7 +14,6 @@ from sagemaker.estimator import Framework from sagemaker.fw_utils import create_image_uri, framework_name_from_image from sagemaker.mxnet.model import MXNetModel -from sagemaker.session import Session class MXNet(Framework): @@ -83,42 +82,23 @@ def create_model(self, model_server_workers=None): sagemaker_session=self.sagemaker_session) @classmethod - def attach(cls, training_job_name, sagemaker_session=None): - """Attach to an existing training job. - - Create an ``Estimator`` bound to an existing training job. After attaching, if - the training job is in a Complete status, it can be ``deploy``ed to create - a SageMaker ``Endpoint`` and return a ``Predictor``. - - If the training job is in progress, attach will block and display log messages - from the training job, until the training job completes. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - training_job_name (str): The name of the training job to attach to. - sagemaker_session (sagemaker.session.Session): Session object which manages interactions with - Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one - using the default AWS configuration chain. + job_details: the returned job details from a describe_training_job API call. Returns: - sagemaker.mxnet.estimator.MXNet: ``Estimator`` with the attached training job. + dictionary: The transformed init_params - Raises: - ValueError: If `training_job_name` is None or the image name does not match the framework. """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is None: - raise ValueError("must specify training_job name") - - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - - init_params.update({'hyperparameters': hp}) + init_params = super(MXNet, cls)._prepare_init_params_from_job_description(job_details) + framework, py_version = framework_name_from_image(init_params.pop('image')) - framework, py_version = framework_name_from_image(image) - init_params.update({'py_version': py_version}) + init_params['py_version'] = py_version + training_job_name = init_params['base_job_name'] if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(MXNet, cls).attach(training_job_name=None, sagemaker_session=sagemaker_session, **init_params) + return init_params diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index 39f9601b3c..463a57d9df 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -11,16 +11,14 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import logging +import os import subprocess import tempfile import threading -import os - import sagemaker.tensorflow from sagemaker.estimator import Framework from sagemaker.fw_utils import create_image_uri, framework_name_from_image -from sagemaker.session import Session from sagemaker.tensorflow.model import TensorFlowModel logging.basicConfig() @@ -153,6 +151,9 @@ def fit(self, inputs, wait=True, logs=True, job_name=None, run_tensorboard_local def fit_super(): super(TensorFlow, self).fit(inputs, wait, logs, job_name) + if run_tensorboard_locally and wait is False: + raise ValueError("Tensorboard is not supported with async fit") + if run_tensorboard_locally: tensorboard = Tensorboard(self) tensorboard.validate_requirements() @@ -166,48 +167,32 @@ def fit_super(): fit_super() @classmethod - def attach(cls, training_job_name, sagemaker_session=None): - """Attach to an existing training job. - - Create an ``Estimator`` bound to an existing training job. After attaching, if - the training job is in a Complete status, it can be ``deploy``ed to create - a SageMaker ``Endpoint`` and return a ``Predictor``. - - If the training job is in progress, attach will block and display log messages - from the training job, until the training job completes. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - training_job_name (str): The name of the training job to attach to. - sagemaker_session (sagemaker.session.Session): Session object which manages interactions with - Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one - using the default AWS configuration chain. + job_details: the returned job details from a describe_training_job API call. Returns: - sagemaker.tensorflow.estimator.TensorFlow: ``Estimator`` with the attached training job. + dictionary: The transformed init_params - Raises: - ValueError: If `training_job_name` is None or the image name does not match the framework. """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is None: - raise ValueError("must specify training_job name") - - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - - updated_params = cls._update_init_params(hp, ['checkpoint_path', 'training_steps', 'evaluation_steps']) - init_params.update(updated_params) + init_params = super(TensorFlow, cls)._prepare_init_params_from_job_description(job_details) - init_params.update({'hyperparameters': hp}) + # Move some of the tensorflow specific init params from hyperparameters into the main init params. + for argument in ['checkpoint_path', 'training_steps', 'evaluation_steps']: + value = init_params['hyperparameters'].pop(argument, None) + if value is not None: + init_params[argument] = value - framework, py_version = framework_name_from_image(image) - init_params.update({'py_version': py_version}) + framework, py_version = framework_name_from_image(init_params.pop('image')) + init_params['py_version'] = py_version + training_job_name = init_params['base_job_name'] if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(TensorFlow, cls).attach(training_job_name=None, sagemaker_session=sagemaker_session, **init_params) + return init_params def train_image(self): """Return the Docker image to use for training. diff --git a/tests/integ/test_byo_estimator.py b/tests/integ/test_byo_estimator.py index d0c1a18e07..71f3c86862 100644 --- a/tests/integ/test_byo_estimator.py +++ b/tests/integ/test_byo_estimator.py @@ -29,6 +29,13 @@ from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name +def fm_serializer(data): + js = {'instances': []} + for row in data: + js['instances'].append({'features': row.tolist()}) + return json.dumps(js) + + def test_byo_estimator(): """Use Factorization Machines algorithm as an example here. @@ -79,12 +86,6 @@ def test_byo_estimator(): endpoint_name = name_from_base('byo') - def fm_serializer(data): - js = {'instances': []} - for row in data: - js['instances'].append({'features': row.tolist()}) - return json.dumps(js) - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): model = estimator.create_model() predictor = model.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) @@ -97,3 +98,61 @@ def fm_serializer(data): assert len(result['predictions']) == 10 for prediction in result['predictions']: assert prediction['score'] is not None + + +def test_async_byo_estimator(): + image_name = registry(REGION) + "/factorization-machines:1" + endpoint_name = name_from_base('byo') + training_job_name = "" + + with timeout(minutes=5): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + # take 100 examples for faster execution + vectors = np.array([t.tolist() for t in train_set[0][:100]]).astype('float32') + labels = np.where(np.array([t.tolist() for t in train_set[1][:100]]) == 0, 1.0, 0.0).astype('float32') + + buf = io.BytesIO() + write_numpy_to_dense_tensor(buf, vectors, labels) + buf.seek(0) + + bucket = sagemaker_session.default_bucket() + prefix = 'test_byo_estimator' + key = 'recordio-pb-data' + boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) + s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) + + estimator = Estimator(image_name=image_name, + role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session, base_job_name='test-byo') + + estimator.set_hyperparameters(num_factors=10, + feature_dim=784, + mini_batch_size=100, + predictor_type='binary_classifier') + + # training labels must be 'float32' + estimator.fit({'train': s3_train_data}, wait=False) + training_job_name = estimator.latest_training_job.name + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=30): + estimator = Estimator.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = estimator.create_model() + predictor = model.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) + predictor.serializer = fm_serializer + predictor.content_type = 'application/json' + predictor.deserializer = sagemaker.predictor.json_deserializer + + result = predictor.predict(train_set[0][:10]) + + assert len(result['predictions']) == 10 + for prediction in result['predictions']: + assert prediction['score'] is not None + + assert estimator.train_image() == image_name diff --git a/tests/integ/test_factorization_machines.py b/tests/integ/test_factorization_machines.py index 76fbb93ac7..cc04ed8d6a 100644 --- a/tests/integ/test_factorization_machines.py +++ b/tests/integ/test_factorization_machines.py @@ -13,6 +13,7 @@ import gzip import pickle import sys +import time import boto3 import os @@ -53,3 +54,45 @@ def test_factorization_machines(): assert len(result) == 10 for record in result: assert record.label["score"] is not None + + +def test_async_factorization_machines(): + + training_job_name = "" + endpoint_name = name_from_base('factorizationMachines') + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=5): + + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + fm = FactorizationMachines(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + num_factors=10, predictor_type='regressor', + epochs=2, clip_gradient=1e2, eps=0.001, rescale_grad=1.0 / 100, + sagemaker_session=sagemaker_session, base_job_name='test-fm') + + # training labels must be 'float32' + fm.fit(fm.record_set(train_set[0][:200], train_set[1][:200].astype('float32')), wait=False) + training_job_name = fm.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + print("attaching now...") + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): + estimator = FactorizationMachines.attach(training_job_name=training_job_name, + sagemaker_session=sagemaker_session) + model = FactorizationMachinesModel(estimator.model_data, role='SageMakerRole', + sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + result = predictor.predict(train_set[0][:10]) + + assert len(result) == 10 + for record in result: + assert record.label["score"] is not None diff --git a/tests/integ/test_kmeans.py b/tests/integ/test_kmeans.py index 09780f69cd..bcaba3ce02 100644 --- a/tests/integ/test_kmeans.py +++ b/tests/integ/test_kmeans.py @@ -16,6 +16,7 @@ import boto3 import os +import time import sagemaker from sagemaker import KMeans, KMeansModel @@ -60,3 +61,49 @@ def test_kmeans(): for record in result: assert record.label["closest_cluster"] is not None assert record.label["distance_to_cluster"] is not None + + +def test_async_kmeans(): + + training_job_name = "" + endpoint_name = name_from_base('kmeans') + + with timeout(minutes=5): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + kmeans = KMeans(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + k=10, sagemaker_session=sagemaker_session, base_job_name='test-kmeans') + + kmeans.init_method = 'random' + kmeans.max_iterators = 1 + kmeans.tol = 1 + kmeans.num_trials = 1 + kmeans.local_init_method = 'kmeans++' + kmeans.half_life_time_size = 1 + kmeans.epochs = 1 + kmeans.center_factor = 1 + + kmeans.fit(kmeans.record_set(train_set[0][:100]), wait=False) + training_job_name = kmeans.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + print("attaching now...") + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): + estimator = KMeans.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = KMeansModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + result = predictor.predict(train_set[0][:10]) + + assert len(result) == 10 + for record in result: + assert record.label["closest_cluster"] is not None + assert record.label["distance_to_cluster"] is not None diff --git a/tests/integ/test_linear_learner.py b/tests/integ/test_linear_learner.py index 31b9f506f3..6b287b3186 100644 --- a/tests/integ/test_linear_learner.py +++ b/tests/integ/test_linear_learner.py @@ -14,9 +14,11 @@ import os import pickle import sys +import time import pytest # noqa import boto3 import numpy as np + import sagemaker from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerModel from sagemaker.utils import name_from_base @@ -84,3 +86,72 @@ def test_linear_learner(): for record in result: assert record.label["predicted_label"] is not None assert record.label["score"] is not None + + +def test_async_linear_learner(): + + training_job_name = "" + endpoint_name = 'test-linear-learner-async-{}'.format(int(time.time())) + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=5): + + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + train_set[1][:100] = 1 + train_set[1][100:200] = 0 + train_set = train_set[0], train_set[1].astype(np.dtype('float32')) + + ll = LinearLearner('SageMakerRole', 1, 'ml.c4.2xlarge', base_job_name='test-linear-learner', + sagemaker_session=sagemaker_session) + ll.binary_classifier_model_selection_criteria = 'accuracy' + ll.target_reacall = 0.5 + ll.target_precision = 0.5 + ll.positive_example_weight_mult = 0.1 + ll.epochs = 1 + ll.predictor_type = 'binary_classifier' + ll.use_bias = True + ll.num_models = 1 + ll.num_calibration_samples = 1 + ll.init_method = 'uniform' + ll.init_scale = 0.5 + ll.init_sigma = 0.2 + ll.init_bias = 5 + ll.optimizer = 'adam' + ll.loss = 'logistic' + ll.wd = 0.5 + ll.l1 = 0.5 + ll.momentum = 0.5 + ll.learning_rate = 0.1 + ll.beta_1 = 0.1 + ll.beta_2 = 0.1 + ll.use_lr_scheduler = True + ll.lr_scheduler_step = 2 + ll.lr_scheduler_factor = 0.5 + ll.lr_scheduler_minimum_lr = 0.1 + ll.normalize_data = False + ll.normalize_label = False + ll.unbias_data = True + ll.unbias_label = False + ll.num_point_for_scala = 10000 + ll.fit(ll.record_set(train_set[0][:200], train_set[1][:200]), wait=False) + training_job_name = ll.latest_training_job.name + + print("Waiting to re-attach to the training job: %s" % training_job_name) + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): + estimator = LinearLearner.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = LinearLearnerModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + + result = predictor.predict(train_set[0][0:100]) + assert len(result) == 100 + for record in result: + assert record.label["predicted_label"] is not None + assert record.label["score"] is not None diff --git a/tests/integ/test_mxnet_train.py b/tests/integ/test_mxnet_train.py index 94feb6e9e1..593f31e5d7 100644 --- a/tests/integ/test_mxnet_train.py +++ b/tests/integ/test_mxnet_train.py @@ -58,6 +58,38 @@ def test_attach_deploy(mxnet_training_job, sagemaker_session): predictor.predict(data) +def test_async_fit(sagemaker_session): + + training_job_name = "" + endpoint_name = 'test-mxnet-attach-deploy-{}'.format(int(time.time())) + + with timeout(minutes=5): + script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') + data_path = os.path.join(DATA_DIR, 'mxnet_mnist') + + mx = MXNet(entry_point=script_path, role='SageMakerRole', + train_instance_count=1, train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session) + + train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/mxnet_mnist/train') + test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/mxnet_mnist/test') + + mx.fit({'train': train_input, 'test': test_input}, wait=False) + training_job_name = mx.latest_training_job.name + + print("Waiting to re-attach to the training job: %s" % training_job_name) + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): + print("Re-attaching now to: %s" % training_job_name) + estimator = MXNet.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + predictor = estimator.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + + def test_deploy_model(mxnet_training_job, sagemaker_session): endpoint_name = 'test-mxnet-deploy-model-{}'.format(int(time.time())) diff --git a/tests/integ/test_pca.py b/tests/integ/test_pca.py index adec22345e..b13994f35a 100644 --- a/tests/integ/test_pca.py +++ b/tests/integ/test_pca.py @@ -14,8 +14,11 @@ import os import pickle import sys +import time + import pytest # noqa import boto3 + import sagemaker import sagemaker.amazon.pca from sagemaker.utils import name_from_base @@ -55,3 +58,47 @@ def test_pca(): assert len(result) == 5 for record in result: assert record.label["projection"] is not None + + +def test_async_pca(): + + training_job_name = "" + endpoint_name = name_from_base('pca') + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=20): + + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + pca = sagemaker.amazon.pca.PCA(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.m4.xlarge', + num_components=48, sagemaker_session=sagemaker_session, base_job_name='test-pca') + + pca.algorithm_mode = 'randomized' + pca.subtract_mean = True + pca.extra_components = 5 + pca.fit(pca.record_set(train_set[0][:100]), wait=False) + training_job_name = pca.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + estimator = sagemaker.amazon.pca.PCA.attach(training_job_name=training_job_name, + sagemaker_session=sagemaker_session) + + model = sagemaker.amazon.pca.PCAModel(estimator.model_data, role='SageMakerRole', + sagemaker_session=sagemaker_session) + predictor = model.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge", + endpoint_name=endpoint_name) + + result = predictor.predict(train_set[0][:5]) + + assert len(result) == 5 + for record in result: + assert record.label["projection"] is not None diff --git a/tests/integ/test_tf.py b/tests/integ/test_tf.py index bb602b83fe..dc3c13cc96 100644 --- a/tests/integ/test_tf.py +++ b/tests/integ/test_tf.py @@ -13,6 +13,7 @@ import boto3 import os import pytest +import time from sagemaker import Session from sagemaker.tensorflow import TensorFlow @@ -52,6 +53,35 @@ def test_tf(sagemaker_session): print('predict result: {}'.format(result)) +def test_tf_async(sagemaker_session): + + training_job_name = "" + with timeout(minutes=5): + script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') + + estimator = TensorFlow(entry_point=script_path, + role='SageMakerRole', + training_steps=1, + evaluation_steps=1, + hyperparameters={'input_tensor_name': 'inputs'}, + train_instance_count=1, + train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session, + base_job_name='test-tf') + + inputs = estimator.sagemaker_session.upload_data(path=DATA_PATH, key_prefix='integ-test-data/tf_iris') + estimator.fit(inputs, wait=False) + training_job_name = estimator.latest_training_job.name + time.sleep(20) + + with timeout_and_delete_endpoint(estimator=estimator, minutes=35): + estimator = TensorFlow.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + json_predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge') + + result = json_predictor.predict([6.4, 3.2, 4.5, 1.5]) + print('predict result: {}'.format(result)) + + def test_failed_tf_training(sagemaker_session): with timeout(minutes=15): script_path = os.path.join(DATA_DIR, 'iris', 'failure_script.py') diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index eb999c4a18..7542cb6880 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -70,6 +70,12 @@ def train_image(self): def create_model(self): return DummyFrameworkModel(self.sagemaker_session) + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + init_params = super(DummyFramework, cls)._prepare_init_params_from_job_description(job_details) + init_params.pop("image", None) + return init_params + class DummyFrameworkModel(FrameworkModel): @@ -251,12 +257,6 @@ def test_attach_framework(sagemaker_session): assert framework_estimator.entry_point == 'iris-dnn-classifier.py' -def test_attach_no_job_name_framework(sagemaker_session): - with pytest.raises(ValueError) as error: - Framework.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert 'must specify training_job name' in str(error) - - def test_fit_then_fit_again(sagemaker_session): fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, diff --git a/tests/unit/test_mxnet.py b/tests/unit/test_mxnet.py index 1f09b54849..a325facb48 100644 --- a/tests/unit/test_mxnet.py +++ b/tests/unit/test_mxnet.py @@ -201,9 +201,3 @@ def test_attach_wrong_framework(sagemaker_session): with pytest.raises(ValueError) as error: MXNet.attach(training_job_name='neo', sagemaker_session=sagemaker_session) assert "didn't use image for requested framework" in str(error) - - -def test_attach_no_job_name(sagemaker_session): - with pytest.raises(ValueError) as error: - MXNet.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert "must specify training_job name" in str(error) diff --git a/tests/unit/test_tf_estimator.py b/tests/unit/test_tf_estimator.py index dd73ac293b..367e66da65 100644 --- a/tests/unit/test_tf_estimator.py +++ b/tests/unit/test_tf_estimator.py @@ -379,9 +379,3 @@ def test_attach_wrong_framework(sagemaker_session): with pytest.raises(ValueError) as error: TensorFlow.attach(training_job_name='neo', sagemaker_session=sagemaker_session) assert "didn't use image for requested framework" in str(error) - - -def test_attach_no_job_name(sagemaker_session): - with pytest.raises(ValueError) as error: - TensorFlow.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert "must specify training_job name" in str(error) diff --git a/tox.ini b/tox.ini index a42fe09296..09795ca02e 100644 --- a/tox.ini +++ b/tox.ini @@ -27,7 +27,11 @@ max-complexity = 10 [testenv] # TEAMCITY_VERSION environment variable exists during build on Teamcity. teamcity-messages uses it in order to enable # reporting to TeamCity. -passenv = TEAMCITY_VERSION +passenv = + TEAMCITY_VERSION + AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY + AWS_SESSION_TOKEN # {posargs} can be passed in by additional arguments specified when invoking tox. # Can be used to specify which tests to run, e.g.: tox -- -s commands =