From eaae90fde1db8ec830c8e413a35364be91593131 Mon Sep 17 00:00:00 2001 From: Lauren Yu <6631887+laurenyu@users.noreply.github.com> Date: Mon, 18 May 2020 14:38:07 -0700 Subject: [PATCH 1/2] breaking: remove estimator parameters for TF legacy mode --- src/sagemaker/tensorflow/estimator.py | 397 +++---------------- tests/conftest.py | 4 - tests/unit/test_airflow.py | 15 +- tests/unit/test_sync_directories.py | 94 ----- tests/unit/test_tf_estimator.py | 551 +------------------------- tests/unit/test_tfs.py | 2 - 6 files changed, 68 insertions(+), 995 deletions(-) delete mode 100644 tests/unit/test_sync_directories.py diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index 47440ba21d..d83d9a1a08 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -10,18 +10,15 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -"""Placeholder docstring""" +"""An estimator class for training with TensorFlow on Amazon SageMaker.""" from __future__ import absolute_import -import contextlib import logging import os -import shutil -import subprocess -import tempfile -import threading -import time +from packaging import version + +from sagemaker import utils from sagemaker.debugger import DebuggerHookConfig from sagemaker.estimator import Framework import sagemaker.fw_utils as fw @@ -29,24 +26,11 @@ from sagemaker.tensorflow.model import TensorFlowModel from sagemaker.tensorflow.serving import Model from sagemaker.transformer import Transformer -from sagemaker import utils from sagemaker.vpc_utils import VPC_CONFIG_DEFAULT logger = logging.getLogger("sagemaker") - -_FRAMEWORK_MODE_ARGS = ( - "training_steps", - "evaluation_steps", - "requirements_file", - "checkpoint_path", -) -_SCRIPT_MODE = "tensorflow-scriptmode" -_SCRIPT_MODE_SERVING_ERROR_MSG = ( - "Script mode containers does not support serving yet. " - "Please use our new tensorflow-serving container by creating the model " - "with 'endpoint_type' set to 'tensorflow-serving'." -) +# TODO: consider creating a function for generating this command before removing this constant _SCRIPT_MODE_TENSORBOARD_WARNING = ( "Tensorboard is not supported with script mode. You can run the following " "command: tensorboard --logdir %s --host localhost --port 6006 This can be " @@ -54,186 +38,34 @@ ) -class Tensorboard(threading.Thread): - """Placeholder docstring""" - - def __init__(self, estimator, logdir=None): - """Initialize ``Tensorboard`` instance. - - Args: - estimator (sagemaker.estimator.Framework): A SageMaker ``Estimator``. - logdir (str): Directory for logs (default: None). If not specified, a temporary - directory is made. - """ - threading.Thread.__init__(self) - self.event = threading.Event() - self.estimator = estimator - self.logdir = logdir or tempfile.mkdtemp() - - @staticmethod - def _cmd_exists(cmd): - """Placeholder docstring""" - for path in os.environ["PATH"].split(os.pathsep): - try: - if os.access(os.path.join(path, cmd), os.X_OK): - return True - except StopIteration: - return False - return False - - @staticmethod - def _sync_directories(from_directory, to_directory): - """Sync to_directory with from_directory by copying each file in - to_directory with new contents. Files in to_directory will be - overwritten by files of the same name in from_directory. We need to - keep two copies of the log directory because otherwise TensorBoard - picks up temp files from `aws s3 sync` and then stops reading the - correct tfevent files. We walk the directory and copy each file - individually because the directory that TensorBoard watches needs to - always exist. - - Args: - from_directory (str): The directory with updated files. - to_directory (str): The directory to be synced. - """ - if not os.path.exists(to_directory): - os.mkdir(to_directory) - for root, dirs, files in os.walk(from_directory): - to_root = root.replace(from_directory, to_directory) - for directory in dirs: - to_child_dir = os.path.join(to_root, directory) - if not os.path.exists(to_child_dir): - os.mkdir(to_child_dir) - for fname in files: - from_file = os.path.join(root, fname) - to_file = os.path.join(to_root, fname) - with open(from_file, "rb") as a, open(to_file, "wb") as b: - b.write(a.read()) - - @staticmethod - @contextlib.contextmanager - def _temporary_directory(): - """Context manager for a temporary directory. This is similar to - tempfile.TemporaryDirectory in python>=3.2. - """ - name = tempfile.mkdtemp() - try: - yield name - finally: - shutil.rmtree(name) - - def validate_requirements(self): - """Ensure that TensorBoard and the AWS CLI are installed. - - These dependencies are required for using TensorBoard. - - Raises: - EnvironmentError: If at least one requirement is not installed. - """ - if not self._cmd_exists("tensorboard"): - raise EnvironmentError( - "TensorBoard is not installed in the system. Please install TensorBoard using the" - " following command: \n pip install tensorboard" - ) - - if not self._cmd_exists("aws"): - raise EnvironmentError( - "The AWS CLI is not installed in the system. Please install the AWS CLI using the" - " following command: \n pip install awscli" - ) - - def create_tensorboard_process(self): - """Create a TensorBoard process. - - Returns: - tuple: A tuple containing: - int: The port number. - process: The TensorBoard process. - - Raises: - OSError: If no ports between 6006 and 6105 are available for starting TensorBoard. - """ - port = 6006 - - for _ in range(100): - p = subprocess.Popen( - [ - "tensorboard", - "--logdir", - self.logdir, - "--host", - "localhost", - "--port", - str(port), - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - self.event.wait(5) - if p.poll(): - port += 1 - else: - return port, p - - raise OSError( - "No available ports to start TensorBoard. Attempted all ports between 6006 and 6105" - ) - - def run(self): - """Run TensorBoard process.""" - port, tensorboard_process = self.create_tensorboard_process() - - logger.info("TensorBoard 0.1.7 at http://localhost:%s", port) - while not self.estimator.checkpoint_path: - self.event.wait(1) - with self._temporary_directory() as aws_sync_dir: - while not self.event.is_set(): - args = ["aws", "s3", "sync", self.estimator.checkpoint_path, aws_sync_dir] - subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self._sync_directories(aws_sync_dir, self.logdir) - self.event.wait(10) - tensorboard_process.terminate() - - class TensorFlow(Framework): """Handle end-to-end training and deployment of user-provided TensorFlow code.""" __framework_name__ = "tensorflow" + _SCRIPT_MODE_REPO_NAME = "tensorflow-scriptmode" LATEST_VERSION = defaults.LATEST_VERSION _LATEST_1X_VERSION = "1.15.2" - _LOWEST_SCRIPT_MODE_ONLY_VERSION = [1, 13] - # 2.0.0 still supports py2 - # we will need to update this version number if future versions still support py2 - _HIGHEST_PYTHON_2_VERSION = [2, 1, 0] + _HIGHEST_LEGACY_MODE_ONLY_VERSION = version.Version("1.10.0") + _LOWEST_SCRIPT_MODE_ONLY_VERSION = version.Version("1.13.1") + + _HIGHEST_PYTHON_2_VERSION = version.Version("2.1.0") def __init__( self, - training_steps=None, - evaluation_steps=None, - checkpoint_path=None, py_version=None, framework_version=None, model_dir=None, - requirements_file="", image_name=None, - script_mode=False, distributions=None, + script_mode=True, **kwargs ): """Initialize a ``TensorFlow`` estimator. Args: - training_steps (int): Perform this many steps of training. `None`, the default means - train forever. - evaluation_steps (int): Perform this many steps of evaluation. `None`, the default - means that evaluation runs until input from eval_input_fn is exhausted (or another - exception is raised). - checkpoint_path (str): Identifies S3 location where checkpoint data during model - training can be saved (default: None). For distributed model training, this - parameter is required. py_version (str): Python version you want to use for executing your model training code (default: 'py2'). framework_version (str): TensorFlow version you want to use for executing your model @@ -251,10 +83,6 @@ def __init__( * *Local Mode with local sources (file:// instead of s3://)* - \ ``/opt/ml/shared/model`` - requirements_file (str): Path to a ``requirements.txt`` file (default: ''). The path - should be within and relative to ``source_dir``. Details on the format can be - found in the Pip User Guide: - image_name (str): If specified, the estimator will use this image for training and hosting, instead of selecting the appropriate SageMaker official image based on framework_version and py_version. It can be an ECR url or dockerhub image and tag. @@ -262,8 +90,6 @@ def __init__( Examples: 123.dkr.ecr.us-west-2.amazonaws.com/my-custom-image:1.0 custom-image:latest. - script_mode (bool): If set to True will the estimator will use the Script Mode - containers (default: False). This will be ignored if py_version is set to 'py3'. distributions (dict): A dictionary with information on how to run distributed training (default: None). Currently we support distributed training with parameter servers and MPI. @@ -289,6 +115,8 @@ def __init__( } } + script_mode (bool): Whether or not to use the Script Mode TensorFlow images + (default: True). **kwargs: Additional kwargs passed to the Framework constructor. .. tip:: @@ -322,60 +150,21 @@ def __init__( kwargs["enable_sagemaker_metrics"] = True super(TensorFlow, self).__init__(image_name=image_name, **kwargs) - self.checkpoint_path = checkpoint_path self.py_version = py_version - self.training_steps = training_steps - self.evaluation_steps = evaluation_steps self.model_dir = model_dir - self.script_mode = script_mode self.distributions = distributions or {} - self._validate_args( - py_version=py_version, - script_mode=script_mode, - framework_version=self.framework_version, - training_steps=training_steps, - evaluation_steps=evaluation_steps, - requirements_file=requirements_file, - checkpoint_path=checkpoint_path, - ) - self._validate_requirements_file(requirements_file) - self.requirements_file = requirements_file + self._script_mode_enabled = script_mode + self._validate_args(py_version=py_version, framework_version=self.framework_version) - def _validate_args( - self, - py_version, - script_mode, - framework_version, - training_steps, - evaluation_steps, - requirements_file, - checkpoint_path, - ): + def _validate_args(self, py_version, framework_version): """Placeholder docstring""" - if py_version == "py3" or script_mode: - + if py_version == "py3": if framework_version is None: raise AttributeError(fw.EMPTY_FRAMEWORK_VERSION_ERROR) - found_args = [] - if training_steps: - found_args.append("training_steps") - if evaluation_steps: - found_args.append("evaluation_steps") - if requirements_file: - found_args.append("requirements_file") - if checkpoint_path: - found_args.append("checkpoint_path") - if found_args: - raise AttributeError( - "{} are deprecated in script mode. Please do not set {}.".format( - ", ".join(_FRAMEWORK_MODE_ARGS), ", ".join(found_args) - ) - ) - if py_version == "py2" and self._only_python_3_supported(): msg = ( "Python 2 containers are only available with {} and lower versions. " @@ -383,103 +172,33 @@ def _validate_args( ) raise AttributeError(msg) - if (not self._script_mode_enabled()) and self._only_script_mode_supported(): + if (not self._script_mode_enabled) and self._only_script_mode_supported(): logger.warning( "Legacy mode is deprecated in versions 1.13 and higher. Using script mode instead." ) - self.script_mode = True + self._script_mode_enabled = True - def _only_script_mode_supported(self): - """Placeholder docstring""" - return [ - int(s) for s in self.framework_version.split(".") - ] >= self._LOWEST_SCRIPT_MODE_ONLY_VERSION + if self._only_legacy_mode_supported(): + # TODO: add link to docs to explain how to use legacy mode with v2 + logger.warning( + "TF %s supports only legacy mode. If you were using any legacy mode parameters " + "(training_steps, evaluation_steps, checkpoint_path, requirements_file), " + "make sure to pass them directly as hyperparameters instead.", + self.framework_version, + ) + self._script_mode_enabled = False - def _only_python_3_supported(self): + def _only_legacy_mode_supported(self): """Placeholder docstring""" - return [int(s) for s in self.framework_version.split(".")] > self._HIGHEST_PYTHON_2_VERSION + return version.Version(self.framework_version) <= self._HIGHEST_LEGACY_MODE_ONLY_VERSION - def _validate_requirements_file(self, requirements_file): + def _only_script_mode_supported(self): """Placeholder docstring""" - if not requirements_file: - return - - if not self.source_dir: - raise ValueError("Must specify source_dir along with a requirements file.") + return version.Version(self.framework_version) >= self._LOWEST_SCRIPT_MODE_ONLY_VERSION - if self.source_dir.lower().startswith("s3://"): - return - - if os.path.isabs(requirements_file): - raise ValueError( - "Requirements file {} is not a path relative to source_dir.".format( - requirements_file - ) - ) - - if not os.path.exists(os.path.join(self.source_dir, requirements_file)): - raise ValueError("Requirements file {} does not exist.".format(requirements_file)) - - def fit( - self, - inputs=None, - wait=True, - logs=True, - job_name=None, - experiment_config=None, - run_tensorboard_locally=False, - ): - """Train a model using the input training dataset. - - See :func:`~sagemaker.estimator.EstimatorBase.fit` for more details. - - Args: - inputs (str or dict or sagemaker.session.s3_input): Information about the training data. - This can be one of three types: - - * (str) - the S3 location where training data is saved. - * (dict[str, str] or dict[str, sagemaker.session.s3_input]) - If using multiple - channels for training data, you can specify a dict mapping channel names - to strings or :func:`~sagemaker.session.s3_input` objects. - * (sagemaker.session.s3_input) - channel configuration for S3 data sources that - can provide additional information as well as the path to the training dataset. - See :func:`sagemaker.session.s3_input` for full details. - - wait (bool): Whether the call should wait until the job completes (default: True). - logs (bool): Whether to show the logs produced by the job. - Only meaningful when wait is True (default: True). - job_name (str): Training job name. If not specified, the estimator generates a default - job name, based on the training image name and current timestamp. - experiment_config (dict[str, str]): Experiment management configuration. - run_tensorboard_locally (bool): Whether to execute TensorBoard in a different process - with downloaded checkpoint information (default: False). This is an experimental - feature, and requires TensorBoard and AWS CLI to be installed. It terminates - TensorBoard when execution ends. - """ - - def fit_super(): - super(TensorFlow, self).fit(inputs, wait, logs, job_name, experiment_config) - - if run_tensorboard_locally and wait is False: - raise ValueError("Tensorboard is not supported with async fit") - - if self._script_mode_enabled() and run_tensorboard_locally: - logger.warning(_SCRIPT_MODE_TENSORBOARD_WARNING, self.model_dir) - fit_super() - elif run_tensorboard_locally: - tensorboard = Tensorboard(self) - tensorboard.validate_requirements() - - try: - tensorboard.start() - fit_super() - finally: - # sleep 20 secs for tensorboard start up if fit() quits instantly - time.sleep(20) - tensorboard.event.set() - tensorboard.join() - else: - fit_super() + def _only_python_3_supported(self): + """Placeholder docstring""" + return version.Version(self.framework_version) > self._HIGHEST_PYTHON_2_VERSION @classmethod def _prepare_init_params_from_job_description(cls, job_details, model_channel_name=None): @@ -496,23 +215,20 @@ def _prepare_init_params_from_job_description(cls, job_details, model_channel_na job_details, model_channel_name ) - # Move some of the tensorflow specific init params from hyperparameters into the main init - # params. - for argument in ("checkpoint_path", "training_steps", "evaluation_steps", "model_dir"): - value = init_params["hyperparameters"].pop(argument, None) - if value is not None: - init_params[argument] = value + model_dir = init_params["hyperparameters"].pop("model_dir", None) + if model_dir is not None: + init_params["model_dir"] = model_dir image_name = init_params.pop("image") framework, py_version, tag, script_mode = fw.framework_name_from_image(image_name) if not framework: - # If we were unable to parse the framework name from the image it is not one of our - # officially supported images, in this case just add the image to the init params. + # If we were unable to parse the framework name from the image, it is not one of our + # officially supported images, so just add the image to the init params. init_params["image_name"] = image_name return init_params - if script_mode: - init_params["script_mode"] = True + if script_mode is None: + init_params["script_mode"] = False init_params["py_version"] = py_version @@ -596,7 +312,7 @@ def create_model( if "enable_network_isolation" not in kwargs: kwargs["enable_network_isolation"] = self.enable_network_isolation() - if endpoint_type == "tensorflow-serving" or self._script_mode_enabled(): + if endpoint_type == "tensorflow-serving" or self._script_mode_enabled: return self._create_tfs_model( role=role, vpc_config_override=vpc_config_override, @@ -656,7 +372,6 @@ def _create_default_model( entry_point or self.entry_point, source_dir=source_dir or self._model_source_dir(), enable_cloudwatch_metrics=self.enable_cloudwatch_metrics, - env={"SAGEMAKER_REQUIREMENTS": self.requirements_file}, container_log_level=self.container_log_level, code_location=self.code_location, py_version=self.py_version, @@ -671,12 +386,10 @@ def _create_default_model( def hyperparameters(self): """Return hyperparameters used by your custom TensorFlow code during model training.""" hyperparameters = super(TensorFlow, self).hyperparameters() + additional_hyperparameters = {} - self.checkpoint_path = self.checkpoint_path or self._default_s3_path("checkpoints") - mpi_enabled = False - - if self._script_mode_enabled(): - additional_hyperparameters = {} + if self._script_mode_enabled: + mpi_enabled = False if "parameter_server" in self.distributions: ps_enabled = self.distributions["parameter_server"].get("enabled", False) @@ -698,13 +411,6 @@ def hyperparameters(self): self.model_dir = self.model_dir or self._default_s3_path("model", mpi=mpi_enabled) additional_hyperparameters["model_dir"] = self.model_dir - else: - additional_hyperparameters = { - "checkpoint_path": self.checkpoint_path, - "training_steps": self.training_steps, - "evaluation_steps": self.evaluation_steps, - "sagemaker_requirements": self.requirements_file, - } hyperparameters.update(Framework._json_encode_hyperparameters(additional_hyperparameters)) return hyperparameters @@ -720,13 +426,8 @@ def _default_s3_path(self, directory, mpi=False): return os.path.join(self.output_path, self._current_job_name, directory) return None - def _script_mode_enabled(self): - """Placeholder docstring""" - return self.py_version == "py3" or self.script_mode - def _validate_and_set_debugger_configs(self): - """ - Disable Debugger Hook Config for ParameterServer (PS) as it is not + """Disable Debugger Hook Config for ParameterServer (PS) as it is not supported in smdebug. Else, set default HookConfig @@ -753,10 +454,10 @@ def train_image(self): if self.image_name: return self.image_name - if self._script_mode_enabled(): + if self._script_mode_enabled: return fw.create_image_uri( self.sagemaker_session.boto_region_name, - _SCRIPT_MODE, + self._SCRIPT_MODE_REPO_NAME, self.train_instance_type, self.framework_version, self.py_version, diff --git a/tests/conftest.py b/tests/conftest.py index b7cab1dd44..22bbac9603 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -194,10 +194,6 @@ def xgboost_version(request): "1.9.0", "1.10", "1.10.0", - "1.11", - "1.11.0", - "1.12", - "1.12.0", ], ) def tf_version(request): diff --git a/tests/unit/test_airflow.py b/tests/unit/test_airflow.py index 835929e750..7652e8ef18 100644 --- a/tests/unit/test_airflow.py +++ b/tests/unit/test_airflow.py @@ -183,8 +183,7 @@ def test_framework_training_config_required_args(ecr_prefix, sagemaker_session): tf = tensorflow.TensorFlow( entry_point="/some/script.py", framework_version="1.10.0", - training_steps=1000, - evaluation_steps=100, + hyperparameters={"training_steps": 1000, "evaluation_steps": 100}, role="{{ role }}", train_instance_count="{{ instance_count }}", train_instance_type="ml.c4.2xlarge", @@ -228,10 +227,8 @@ def test_framework_training_config_required_args(ecr_prefix, sagemaker_session): "sagemaker_container_log_level": "20", "sagemaker_job_name": '"sagemaker-tensorflow-%s"' % TIME_STAMP, "sagemaker_region": '"us-west-2"', - "checkpoint_path": '"s3://output/sagemaker-tensorflow-%s/checkpoints"' % TIME_STAMP, "training_steps": "1000", "evaluation_steps": "100", - "sagemaker_requirements": '""', }, "S3Operations": { "S3Upload": [ @@ -265,12 +262,14 @@ def test_framework_training_config_all_args(ecr_prefix, sagemaker_session): enable_cloudwatch_metrics=False, container_log_level="{{ log_level }}", code_location="s3://{{ bucket_name }}/{{ prefix }}", - training_steps=1000, - evaluation_steps=100, - checkpoint_path="{{ checkpoint_path }}", + hyperparameters={ + "training_steps": 1000, + "evaluation_steps": 100, + "checkpoint_path": "{{ checkpoint_path }}", + "sagemaker_requirements": "", + }, py_version="py2", framework_version="1.10.0", - requirements_file="", role="{{ role }}", train_instance_count="{{ instance_count }}", train_instance_type="ml.c4.2xlarge", diff --git a/tests/unit/test_sync_directories.py b/tests/unit/test_sync_directories.py deleted file mode 100644 index 63c8a2e24c..0000000000 --- a/tests/unit/test_sync_directories.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright 2017-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). You -# may not use this file except in compliance with the License. A copy of -# the License is located at -# -# http://aws.amazon.com/apache2.0/ -# -# or in the "license" file accompanying this file. This file is -# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific -# language governing permissions and limitations under the License. -from __future__ import absolute_import - -import filecmp -import os -import random -import shutil - -from sagemaker.tensorflow.estimator import Tensorboard - - -def create_test_directory(directory, variable_content="hello world"): - """Create dummy data for testing Tensorboard._sync_directories with the - following structure: - - - |_ child_directory - |_ hello.txt - |_ foo1.txt - |_ foo2.txt - - Args: - directory (str): The path to a directory to create with fake files - variable_content (str): Content to put in one of the files - """ - child_dir = os.path.join(directory, "child_directory") - os.mkdir(child_dir) - with open(os.path.join(directory, "foo1.txt"), "w") as f: - f.write("bar1") - with open(os.path.join(directory, "foo2.txt"), "w") as f: - f.write("bar2") - with open(os.path.join(child_dir, "hello.txt"), "w") as f: - f.write(variable_content) - - -def same_dirs(a, b): - """Check that structure and files are the same for directories a and b - - Args: - a (str): The path to the first directory - b (str): The path to the second directory - """ - comp = filecmp.dircmp(a, b) - common = sorted(comp.common) - left = sorted(comp.left_list) - right = sorted(comp.right_list) - if left != common or right != common: - return False - if len(comp.diff_files): - return False - for subdir in comp.common_dirs: - left_subdir = os.path.join(a, subdir) - right_subdir = os.path.join(b, subdir) - return same_dirs(left_subdir, right_subdir) - return True - - -def test_to_directory_doesnt_exist(): - with Tensorboard._temporary_directory() as from_dir: - create_test_directory(from_dir) - to_dir = "./not_a_real_place_{}".format(random.getrandbits(64)) - Tensorboard._sync_directories(from_dir, to_dir) - assert same_dirs(from_dir, to_dir) - shutil.rmtree(to_dir) - - -def test_only_root_of_to_directory_exists(): - with Tensorboard._temporary_directory() as from_dir: - with Tensorboard._temporary_directory() as to_dir: - create_test_directory(from_dir) - assert not same_dirs(from_dir, to_dir) - Tensorboard._sync_directories(from_dir, to_dir) - assert same_dirs(from_dir, to_dir) - - -def test_files_are_overwritten_when_they_already_exist(): - with Tensorboard._temporary_directory() as from_dir: - with Tensorboard._temporary_directory() as to_dir: - create_test_directory(from_dir) - create_test_directory(to_dir, "foo bar") - assert not same_dirs(from_dir, to_dir) - Tensorboard._sync_directories(from_dir, to_dir) - assert same_dirs(from_dir, to_dir) diff --git a/tests/unit/test_tf_estimator.py b/tests/unit/test_tf_estimator.py index d00523c357..84caff2421 100644 --- a/tests/unit/test_tf_estimator.py +++ b/tests/unit/test_tf_estimator.py @@ -19,19 +19,13 @@ import pytest from mock import patch, Mock, MagicMock -from sagemaker.fw_utils import create_image_uri -from sagemaker.estimator import _TrainingJob -from sagemaker.model import MODEL_SERVER_WORKERS_PARAM_NAME -from sagemaker.session import s3_input from sagemaker.tensorflow import defaults, serving, TensorFlow, TensorFlowModel, TensorFlowPredictor -import sagemaker.tensorflow.estimator as tfe DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") SCRIPT_FILE = "dummy_script.py" SCRIPT_PATH = os.path.join(DATA_DIR, SCRIPT_FILE) SERVING_SCRIPT_FILE = "another_dummy_script.py" MODEL_DATA = "s3://some/data.tar.gz" -REQUIREMENTS_FILE = "dummy_requirements.txt" TIMESTAMP = "2017-11-06-14:14:15.673" TIME = 1510006209.073025 BUCKET_NAME = "mybucket" @@ -111,16 +105,12 @@ def _hyperparameters(script_mode=False, horovod=False): "sagemaker_job_name": json.dumps(job_name), "sagemaker_region": json.dumps("us-west-2"), } - if script_mode: - if horovod: - hps["model_dir"] = json.dumps("/opt/ml/model") - else: - hps["model_dir"] = json.dumps("s3://{}/{}/model".format(BUCKET_NAME, job_name)) + + if horovod: + hps["model_dir"] = json.dumps("/opt/ml/model") else: - hps["checkpoint_path"] = json.dumps("s3://{}/{}/checkpoints".format(BUCKET_NAME, job_name)) - hps["training_steps"] = "1000" - hps["evaluation_steps"] = "10" - hps["sagemaker_requirements"] = '"{}"'.format(REQUIREMENTS_FILE) + hps["model_dir"] = json.dumps("s3://{}/{}/model".format(BUCKET_NAME, job_name)) + return hps @@ -175,95 +165,40 @@ def _build_tf( sagemaker_session, framework_version=defaults.TF_VERSION, train_instance_type=None, - checkpoint_path=None, base_job_name=None, - training_steps=None, - evaluation_steps=None, **kwargs ): return TensorFlow( entry_point=SCRIPT_PATH, - training_steps=training_steps, - evaluation_steps=evaluation_steps, framework_version=framework_version, role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=train_instance_type if train_instance_type else INSTANCE_TYPE, - checkpoint_path=checkpoint_path, base_job_name=base_job_name, **kwargs ) -def test_tf_support_cpu_instances(sagemaker_session, tf_version): +def test_tf_cpu_images(sagemaker_session, tf_version): tf = _build_tf(sagemaker_session, tf_version, train_instance_type="ml.c2.2xlarge") - assert tf.train_image() == _get_full_cpu_image_uri(tf_version) tf = _build_tf(sagemaker_session, tf_version, train_instance_type="ml.c4.2xlarge") - assert tf.train_image() == _get_full_cpu_image_uri(tf_version) tf = _build_tf(sagemaker_session, tf_version, train_instance_type="ml.m16") - assert tf.train_image() == _get_full_cpu_image_uri(tf_version) -def test_tf_support_gpu_instances(sagemaker_session, tf_version): +def test_tf_gpu_images(sagemaker_session, tf_version): tf = _build_tf(sagemaker_session, tf_version, train_instance_type="ml.g2.2xlarge") - assert tf.train_image() == _get_full_gpu_image_uri(tf_version) tf = _build_tf(sagemaker_session, tf_version, train_instance_type="ml.p2.2xlarge") - assert tf.train_image() == _get_full_gpu_image_uri(tf_version) -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_deploy_model_server_workers(sagemaker_session): - tf = _build_tf(sagemaker_session) - tf.fit(inputs=s3_input("s3://mybucket/train")) - - tf.deploy(initial_instance_count=1, instance_type="ml.c2.2xlarge", model_server_workers=2) - - assert ( - "2" - == sagemaker_session.method_calls[3][1][2]["Environment"][ - MODEL_SERVER_WORKERS_PARAM_NAME.upper() - ] - ) - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_deploy_model_server_workers_unset(sagemaker_session): - tf = _build_tf(sagemaker_session) - tf.fit(inputs=s3_input("s3://mybucket/train")) - - tf.deploy(initial_instance_count=1, instance_type="ml.c2.2xlarge") - - assert ( - MODEL_SERVER_WORKERS_PARAM_NAME.upper() - not in sagemaker_session.method_calls[3][1][2]["Environment"] - ) - - -def test_tf_invalid_requirements_path(sagemaker_session): - requirements_file = "/foo/bar/requirements.txt" - with pytest.raises(ValueError) as e: - _build_tf(sagemaker_session, requirements_file=requirements_file, source_dir=DATA_DIR) - assert "Requirements file {} is not a path relative to source_dir.".format( - requirements_file - ) in str(e.value) - - -def test_tf_nonexistent_requirements_path(sagemaker_session): - requirements_file = "nonexistent_requirements.txt" - with pytest.raises(ValueError) as e: - _build_tf(sagemaker_session, requirements_file=requirements_file, source_dir=DATA_DIR) - assert "Requirements file {} does not exist.".format(requirements_file) in str(e.value) - - def test_create_model(sagemaker_session, tf_version): container_log_level = '"logging.INFO"' source_dir = "s3://mybucket/source" @@ -271,8 +206,6 @@ def test_create_model(sagemaker_session, tf_version): entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, - training_steps=1000, - evaluation_steps=10, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, framework_version=tf_version, @@ -306,8 +239,6 @@ def test_create_model_with_optional_params(sagemaker_session): entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, - training_steps=1000, - evaluation_steps=10, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, container_log_level=container_log_level, @@ -320,12 +251,10 @@ def test_create_model_with_optional_params(sagemaker_session): tf.fit(inputs="s3://mybucket/train", job_name=job_name) new_role = "role" - model_server_workers = 2 vpc_config = {"Subnets": ["foo"], "SecurityGroupIds": ["bar"]} model_name = "model-name" model = tf.create_model( role=new_role, - model_server_workers=model_server_workers, vpc_config_override=vpc_config, entry_point=SERVING_SCRIPT_FILE, name=model_name, @@ -333,128 +262,12 @@ def test_create_model_with_optional_params(sagemaker_session): ) assert model.role == new_role - assert model.model_server_workers == model_server_workers assert model.vpc_config == vpc_config assert model.entry_point == SERVING_SCRIPT_FILE assert model.name == model_name assert model.enable_network_isolation() -@patch("sagemaker.tensorflow.estimator.TensorFlow.create_model") -def test_transformer_creation_with_optional_args(create_model, sagemaker_session): - model = Mock() - create_model.return_value = model - - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - tf.latest_training_job = _TrainingJob(sagemaker_session, "some-job-name") - - strategy = "SingleRecord" - assemble_with = "Line" - output_path = "s3://{}/batch-output".format(BUCKET_NAME) - kms_key = "kms" - accept_type = "text/bytes" - env = {"foo": "bar"} - max_concurrent_transforms = 3 - max_payload = 100 - tags = {"Key": "foo", "Value": "bar"} - new_role = "role" - model_server_workers = 2 - vpc_config = {"Subnets": ["1234"], "SecurityGroupIds": ["5678"]} - model_name = "model-name" - - tf.transformer( - INSTANCE_COUNT, - INSTANCE_TYPE, - strategy=strategy, - assemble_with=assemble_with, - output_path=output_path, - output_kms_key=kms_key, - accept=accept_type, - env=env, - max_concurrent_transforms=max_concurrent_transforms, - max_payload=max_payload, - tags=tags, - role=new_role, - model_server_workers=model_server_workers, - volume_kms_key=kms_key, - endpoint_type="tensorflow-serving", - entry_point=SERVING_SCRIPT_FILE, - vpc_config_override=vpc_config, - enable_network_isolation=True, - model_name=model_name, - ) - - create_model.assert_called_with( - model_server_workers=model_server_workers, - role=new_role, - vpc_config_override=vpc_config, - endpoint_type="tensorflow-serving", - entry_point=SERVING_SCRIPT_FILE, - enable_network_isolation=True, - name=model_name, - ) - model.transformer.assert_called_with( - INSTANCE_COUNT, - INSTANCE_TYPE, - accept=accept_type, - assemble_with=assemble_with, - env=env, - max_concurrent_transforms=max_concurrent_transforms, - max_payload=max_payload, - output_kms_key=kms_key, - output_path=output_path, - strategy=strategy, - tags=tags, - volume_kms_key=kms_key, - ) - - -@patch("sagemaker.tensorflow.estimator.TensorFlow.create_model") -def test_transformer_creation_without_optional_args(create_model, sagemaker_session): - model = Mock() - create_model.return_value = model - - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - tf.latest_training_job = _TrainingJob(sagemaker_session, "some-job-name") - tf.transformer(INSTANCE_COUNT, INSTANCE_TYPE) - - create_model.assert_called_with( - endpoint_type=None, - model_server_workers=None, - role=ROLE, - vpc_config_override="VPC_CONFIG_DEFAULT", - entry_point=None, - enable_network_isolation=False, - name=None, - ) - model.transformer.assert_called_with( - INSTANCE_COUNT, - INSTANCE_TYPE, - accept=None, - assemble_with=None, - env=None, - max_concurrent_transforms=None, - max_payload=None, - output_kms_key=None, - output_path=None, - strategy=None, - tags=None, - volume_kms_key=None, - ) - - def test_create_model_with_custom_image(sagemaker_session): container_log_level = '"logging.INFO"' source_dir = "s3://mybucket/source" @@ -463,8 +276,6 @@ def test_create_model_with_custom_image(sagemaker_session): entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, - training_steps=1000, - evaluation_steps=10, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, image_name=custom_image, @@ -480,83 +291,6 @@ def test_create_model_with_custom_image(sagemaker_session): assert model.image == custom_image -@patch("sagemaker.utils.create_tar_file", MagicMock()) -@patch("time.strftime", MagicMock(return_value=TIMESTAMP)) -@patch("time.time", MagicMock(return_value=TIME)) -def test_tf(sagemaker_session, tf_version): - tf = TensorFlow( - entry_point=SCRIPT_FILE, - role=ROLE, - sagemaker_session=sagemaker_session, - training_steps=1000, - evaluation_steps=10, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - framework_version=tf_version, - requirements_file=REQUIREMENTS_FILE, - source_dir=DATA_DIR, - ) - - inputs = "s3://mybucket/train" - - tf.fit(inputs=inputs, experiment_config=EXPERIMENT_CONFIG) - - call_names = [c[0] for c in sagemaker_session.method_calls] - assert call_names == ["train", "logs_for_job"] - - expected_train_args = _create_train_job(tf_version) - expected_train_args["input_config"][0]["DataSource"]["S3DataSource"]["S3Uri"] = inputs - expected_train_args["experiment_config"] = EXPERIMENT_CONFIG - - actual_train_args = sagemaker_session.method_calls[0][2] - assert actual_train_args == expected_train_args - - model = tf.create_model() - - environment = { - "Environment": { - "SAGEMAKER_SUBMIT_DIRECTORY": "s3://mybucket/sagemaker-tensorflow-2017-11-06-14:14:15.673/source/sourcedir.tar.gz", # noqa: E501 - "SAGEMAKER_PROGRAM": "dummy_script.py", - "SAGEMAKER_REQUIREMENTS": "dummy_requirements.txt", - "SAGEMAKER_ENABLE_CLOUDWATCH_METRICS": "false", - "SAGEMAKER_REGION": "us-west-2", - "SAGEMAKER_CONTAINER_LOG_LEVEL": "20", - }, - "Image": create_image_uri("us-west-2", "tensorflow", INSTANCE_TYPE, tf_version, "py2"), - "ModelDataUrl": "s3://m/m.tar.gz", - } - assert environment == model.prepare_container_def(INSTANCE_TYPE) - - assert "cpu" in model.prepare_container_def(INSTANCE_TYPE)["Image"] - predictor = tf.deploy(1, INSTANCE_TYPE) - assert isinstance(predictor, TensorFlowPredictor) - - -@patch("time.strftime", return_value=TIMESTAMP) -@patch("time.time", return_value=TIME) -@patch("subprocess.Popen") -@patch("subprocess.call") -@patch("os.access", return_value=False) -def test_run_tensorboard_locally_without_tensorboard_binary( - time, strftime, popen, call, access, sagemaker_session -): - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - - with pytest.raises(EnvironmentError) as error: - tf.fit(inputs="s3://mybucket/train", run_tensorboard_locally=True) - assert ( - str(error.value) - == "TensorBoard is not installed in the system. Please install TensorBoard using the " - "following command: \n pip install tensorboard" - ) - - @patch("sagemaker.utils.create_tar_file", MagicMock()) def test_model(sagemaker_session, tf_version): model = TensorFlowModel( @@ -575,151 +309,6 @@ def test_model_image_accelerator(sagemaker_session): assert container_def["Image"] == _get_full_cpu_image_uri_with_ei(defaults.TF_VERSION) -@patch("time.strftime", return_value=TIMESTAMP) -@patch("time.time", return_value=TIME) -@patch("subprocess.Popen") -@patch("subprocess.call") -@patch("os.access", side_effect=[False, True]) -def test_run_tensorboard_locally_without_awscli_binary( - time, strftime, popen, call, access, sagemaker_session -): - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - - with pytest.raises(EnvironmentError) as error: - tf.fit(inputs="s3://mybucket/train", run_tensorboard_locally=True) - assert ( - str(error.value) - == "The AWS CLI is not installed in the system. Please install the AWS CLI using the " - "following command: \n pip install awscli" - ) - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -@patch("sagemaker.tensorflow.estimator.Tensorboard._sync_directories") -@patch("tempfile.mkdtemp", return_value="/my/temp/folder") -@patch("shutil.rmtree") -@patch("os.access", return_value=True) -@patch("subprocess.call") -@patch("subprocess.Popen") -@patch("time.strftime", return_value=TIMESTAMP) -@patch("time.time", return_value=TIME) -@patch("time.sleep") -def test_run_tensorboard_locally( - sleep, time, strftime, popen, call, access, rmtree, mkdtemp, sync, sagemaker_session -): - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - - popen().poll.return_value = None - - tf.fit(inputs="s3://mybucket/train", run_tensorboard_locally=True) - - popen.assert_called_with( - ["tensorboard", "--logdir", "/my/temp/folder", "--host", "localhost", "--port", "6006"], - stderr=-1, - stdout=-1, - ) - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -@patch("sagemaker.tensorflow.estimator.Tensorboard._sync_directories") -@patch("tempfile.mkdtemp", return_value="/my/temp/folder") -@patch("shutil.rmtree") -@patch("socket.socket") -@patch("os.access", return_value=True) -@patch("subprocess.call") -@patch("subprocess.Popen") -@patch("time.strftime", return_value=TIMESTAMP) -@patch("time.time", return_value=TIME) -@patch("time.sleep") -def test_run_tensorboard_locally_port_in_use( - sleep, time, strftime, popen, call, access, socket, rmtree, mkdtemp, sync, sagemaker_session -): - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - ) - - popen().poll.side_effect = [-1, None] - - tf.fit(inputs="s3://mybucket/train", run_tensorboard_locally=True) - - popen.assert_any_call( - ["tensorboard", "--logdir", "/my/temp/folder", "--host", "localhost", "--port", "6006"], - stderr=-1, - stdout=-1, - ) - - popen.assert_any_call( - ["tensorboard", "--logdir", "/my/temp/folder", "--host", "localhost", "--port", "6007"], - stderr=-1, - stdout=-1, - ) - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_checkpoint_not_set(sagemaker_session): - job_name = "sagemaker-tensorflow-py2-gpu-2017-10-24-14-12-09" - tf = _build_tf( - sagemaker_session, - checkpoint_path=None, - base_job_name=job_name, - output_path="s3://{}/".format(sagemaker_session.default_bucket()), - ) - tf.fit(inputs=s3_input("s3://mybucket/train"), job_name=job_name) - - expected_result = '"s3://{}/{}/checkpoints"'.format( - sagemaker_session.default_bucket(), job_name - ) - assert tf.hyperparameters()["checkpoint_path"] == expected_result - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_training_and_evaluation_steps_not_set(sagemaker_session): - job_name = "sagemaker-tensorflow-py2-gpu-2017-10-24-14-12-09" - output_path = "s3://{}/output/{}/".format(sagemaker_session.default_bucket(), job_name) - - tf = _build_tf( - sagemaker_session, training_steps=None, evaluation_steps=None, output_path=output_path - ) - tf.fit(inputs=s3_input("s3://mybucket/train")) - assert tf.hyperparameters()["training_steps"] == "null" - assert tf.hyperparameters()["evaluation_steps"] == "null" - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_training_and_evaluation_steps(sagemaker_session): - job_name = "sagemaker-tensorflow-py2-gpu-2017-10-24-14-12-09" - output_path = "s3://{}/output/{}/".format(sagemaker_session.default_bucket(), job_name) - - tf = _build_tf( - sagemaker_session, training_steps=123, evaluation_steps=456, output_path=output_path - ) - tf.fit(inputs=s3_input("s3://mybucket/train")) - assert tf.hyperparameters()["training_steps"] == "123" - assert tf.hyperparameters()["evaluation_steps"] == "456" - - -@patch("sagemaker.utils.create_tar_file", MagicMock()) -def test_tf_checkpoint_set(sagemaker_session): - tf = _build_tf(sagemaker_session, checkpoint_path="s3://my_checkpoint_bucket") - assert tf.hyperparameters()["checkpoint_path"] == json.dumps("s3://my_checkpoint_bucket") - - @patch("sagemaker.utils.create_tar_file", MagicMock()) def test_train_image_default(sagemaker_session): tf = TensorFlow( @@ -730,7 +319,7 @@ def test_train_image_default(sagemaker_session): train_instance_type=INSTANCE_TYPE, ) - assert _get_full_cpu_image_uri(defaults.TF_VERSION) in tf.train_image() + assert _get_full_cpu_image_uri(defaults.TF_VERSION, repo=SM_IMAGE_REPO_NAME) == tf.train_image() @patch("sagemaker.utils.create_tar_file", MagicMock()) @@ -742,13 +331,10 @@ def test_attach(sagemaker_session, tf_version): "AlgorithmSpecification": {"TrainingInputMode": "File", "TrainingImage": training_image}, "HyperParameters": { "sagemaker_submit_directory": '"s3://some/sourcedir.tar.gz"', - "checkpoint_path": '"s3://other/1508872349"', "sagemaker_program": '"iris-dnn-classifier.py"', "sagemaker_enable_cloudwatch_metrics": "false", "sagemaker_container_log_level": '"logging.INFO"', "sagemaker_job_name": '"neo"', - "training_steps": "100", - "evaluation_steps": "10", }, "RoleArn": "arn:aws:iam::366:role/SageMakerRole", "ResourceConfig": { @@ -775,16 +361,12 @@ def test_attach(sagemaker_session, tf_version): assert estimator.train_instance_count == 1 assert estimator.train_max_run == 24 * 60 * 60 assert estimator.input_mode == "File" - assert estimator.training_steps == 100 - assert estimator.evaluation_steps == 10 assert estimator.input_mode == "File" assert estimator.base_job_name == "neo" assert estimator.output_path == "s3://place/output/neo" assert estimator.output_kms_key == "" - assert estimator.hyperparameters()["training_steps"] == "100" assert estimator.source_dir == "s3://some/sourcedir.tar.gz" assert estimator.entry_point == "iris-dnn-classifier.py" - assert estimator.checkpoint_path == "s3://other/1508872349" @patch("sagemaker.utils.create_tar_file", MagicMock()) @@ -796,13 +378,10 @@ def test_attach_new_repo_name(sagemaker_session, tf_version): "AlgorithmSpecification": {"TrainingInputMode": "File", "TrainingImage": training_image}, "HyperParameters": { "sagemaker_submit_directory": '"s3://some/sourcedir.tar.gz"', - "checkpoint_path": '"s3://other/1508872349"', "sagemaker_program": '"iris-dnn-classifier.py"', "sagemaker_enable_cloudwatch_metrics": "false", "sagemaker_container_log_level": '"logging.INFO"', "sagemaker_job_name": '"neo"', - "training_steps": "100", - "evaluation_steps": "10", }, "RoleArn": "arn:aws:iam::366:role/SageMakerRole", "ResourceConfig": { @@ -829,16 +408,12 @@ def test_attach_new_repo_name(sagemaker_session, tf_version): assert estimator.train_instance_count == 1 assert estimator.train_max_run == 24 * 60 * 60 assert estimator.input_mode == "File" - assert estimator.training_steps == 100 - assert estimator.evaluation_steps == 10 assert estimator.input_mode == "File" assert estimator.base_job_name == "neo" assert estimator.output_path == "s3://place/output/neo" assert estimator.output_kms_key == "" - assert estimator.hyperparameters()["training_steps"] == "100" assert estimator.source_dir == "s3://some/sourcedir.tar.gz" assert estimator.entry_point == "iris-dnn-classifier.py" - assert estimator.checkpoint_path == "s3://other/1508872349" assert estimator.train_image() == training_image @@ -849,13 +424,10 @@ def test_attach_old_container(sagemaker_session): "AlgorithmSpecification": {"TrainingInputMode": "File", "TrainingImage": training_image}, "HyperParameters": { "sagemaker_submit_directory": '"s3://some/sourcedir.tar.gz"', - "checkpoint_path": '"s3://other/1508872349"', "sagemaker_program": '"iris-dnn-classifier.py"', "sagemaker_enable_cloudwatch_metrics": "false", "sagemaker_container_log_level": '"logging.INFO"', "sagemaker_job_name": '"neo"', - "training_steps": "100", - "evaluation_steps": "10", }, "RoleArn": "arn:aws:iam::366:role/SageMakerRole", "ResourceConfig": { @@ -882,16 +454,12 @@ def test_attach_old_container(sagemaker_session): assert estimator.train_instance_count == 1 assert estimator.train_max_run == 24 * 60 * 60 assert estimator.input_mode == "File" - assert estimator.training_steps == 100 - assert estimator.evaluation_steps == 10 assert estimator.input_mode == "File" assert estimator.base_job_name == "neo" assert estimator.output_path == "s3://place/output/neo" assert estimator.output_kms_key == "" - assert estimator.hyperparameters()["training_steps"] == "100" assert estimator.source_dir == "s3://some/sourcedir.tar.gz" assert estimator.entry_point == "iris-dnn-classifier.py" - assert estimator.checkpoint_path == "s3://other/1508872349" def test_attach_wrong_framework(sagemaker_session): @@ -905,7 +473,6 @@ def test_attach_wrong_framework(sagemaker_session): "sagemaker_program": '"iris-dnn-classifier.py"', "sagemaker_enable_cloudwatch_metrics": "false", "sagemaker_container_log_level": '"logging.INFO"', - "training_steps": "100", }, "RoleArn": "arn:aws:iam::366:role/SageMakerRole", "ResourceConfig": { @@ -935,13 +502,10 @@ def test_attach_custom_image(sagemaker_session): "AlgorithmSpecification": {"TrainingInputMode": "File", "TrainingImage": training_image}, "HyperParameters": { "sagemaker_submit_directory": '"s3://some/sourcedir.tar.gz"', - "checkpoint_path": '"s3://other/1508872349"', "sagemaker_program": '"iris-dnn-classifier.py"', "sagemaker_enable_cloudwatch_metrics": "false", "sagemaker_container_log_level": '"logging.INFO"', "sagemaker_job_name": '"neo"', - "training_steps": "100", - "evaluation_steps": "10", }, "RoleArn": "arn:aws:iam::366:role/SageMakerRole", "ResourceConfig": { @@ -977,7 +541,7 @@ def test_estimator_py2_deprecation_warning(warning, sagemaker_session): ) assert estimator.py_version == "py2" - warning.assert_called_with(estimator.__framework_name__, defaults.LATEST_PY2_VERSION) + warning.assert_called_with("tensorflow", "2.1.0") model = TensorFlowModel( MODEL_DATA, @@ -1016,47 +580,6 @@ def test_empty_framework_version(warning, sagemaker_session): warning.assert_called_with(defaults.TF_VERSION, defaults.LATEST_VERSION) -def _deprecated_args_msg(args): - return "{} are deprecated in script mode. Please do not set {}.".format( - ", ".join(tfe._FRAMEWORK_MODE_ARGS), args - ) - - -def test_script_mode_deprecated_args(sagemaker_session): - with pytest.raises(AttributeError) as e: - _build_tf( - sagemaker_session=sagemaker_session, py_version="py3", checkpoint_path="some_path" - ) - assert _deprecated_args_msg("checkpoint_path") in str(e.value) - - with pytest.raises(AttributeError) as e: - _build_tf(sagemaker_session=sagemaker_session, py_version="py3", training_steps=1) - assert _deprecated_args_msg("training_steps") in str(e.value) - - with pytest.raises(AttributeError) as e: - _build_tf(sagemaker_session=sagemaker_session, script_mode=True, evaluation_steps=1) - assert _deprecated_args_msg("evaluation_steps") in str(e.value) - - with pytest.raises(AttributeError) as e: - _build_tf( - sagemaker_session=sagemaker_session, script_mode=True, requirements_file="some_file" - ) - assert _deprecated_args_msg("requirements_file") in str(e.value) - - with pytest.raises(AttributeError) as e: - _build_tf( - sagemaker_session=sagemaker_session, - script_mode=True, - checkpoint_path="some_path", - requirements_file="some_file", - training_steps=1, - evaluation_steps=1, - ) - assert _deprecated_args_msg( - "training_steps, evaluation_steps, requirements_file, checkpoint_path" - ) in str(e.value) - - def test_py2_version_deprecated(sagemaker_session): with pytest.raises(AttributeError) as e: TensorFlow( @@ -1097,33 +620,9 @@ def test_py3_is_default_version_before_tf1_14(sagemaker_session): assert estimator.py_version == "py2" -def test_legacy_mode_deprecated(sagemaker_session): - tf = _build_tf( - sagemaker_session=sagemaker_session, - framework_version="1.13.1", - py_version="py2", - script_mode=False, - ) - assert tf._script_mode_enabled() is True - - tf = _build_tf( - sagemaker_session=sagemaker_session, - framework_version="1.12", - py_version="py2", - script_mode=False, - ) - assert tf._script_mode_enabled() is False - - -def test_script_mode_enabled(sagemaker_session): - tf = _build_tf(sagemaker_session=sagemaker_session, py_version="py3") - assert tf._script_mode_enabled() is True - - tf = _build_tf(sagemaker_session=sagemaker_session, script_mode=True) - assert tf._script_mode_enabled() is True - - tf = _build_tf(sagemaker_session=sagemaker_session) - assert tf._script_mode_enabled() is False +def test_legacy_mode_framework_name(sagemaker_session): + tf = _build_tf(sagemaker_session=sagemaker_session, framework_version="1.10") + assert tf.__framework_name__ == "tensorflow" def test_script_mode_create_model(sagemaker_session): @@ -1145,32 +644,6 @@ def test_script_mode_create_model(sagemaker_session): assert model.enable_network_isolation() -@patch("sagemaker.utils.create_tar_file", MagicMock()) -@patch("sagemaker.tensorflow.estimator.Tensorboard._sync_directories") -@patch("sagemaker.tensorflow.estimator.Tensorboard.start") -@patch("os.access", return_value=True) -@patch("subprocess.call") -@patch("subprocess.Popen") -@patch("time.strftime", return_value=TIMESTAMP) -@patch("time.time", return_value=TIME) -@patch("time.sleep") -def test_script_mode_tensorboard( - sleep, time, strftime, popen, call, access, start, sync, sagemaker_session -): - tf = TensorFlow( - entry_point=SCRIPT_PATH, - role=ROLE, - sagemaker_session=sagemaker_session, - train_instance_count=INSTANCE_COUNT, - train_instance_type=INSTANCE_TYPE, - framework_version="1.0", - script_mode=True, - ) - popen().poll.return_value = None - tf.fit(inputs="s3://mybucket/train", run_tensorboard_locally=True) - start.assert_not_called() - - @patch("time.strftime", return_value=TIMESTAMP) @patch("time.time", return_value=TIME) @patch("sagemaker.utils.create_tar_file", MagicMock()) diff --git a/tests/unit/test_tfs.py b/tests/unit/test_tfs.py index b24a5a742f..bb8da27ef8 100644 --- a/tests/unit/test_tfs.py +++ b/tests/unit/test_tfs.py @@ -253,8 +253,6 @@ def test_estimator_deploy(sagemaker_session): entry_point="script.py", role=ROLE, sagemaker_session=sagemaker_session, - training_steps=1000, - evaluation_steps=10, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, image_name=custom_image, From 859011628d3e228f75ad1807b0d47612f620045f Mon Sep 17 00:00:00 2001 From: Lauren Yu <6631887+laurenyu@users.noreply.github.com> Date: Tue, 19 May 2020 11:07:35 -0700 Subject: [PATCH 2/2] remove TF legacy mode local mode tests --- tests/integ/test_local_mode.py | 195 +++++---------------------------- 1 file changed, 26 insertions(+), 169 deletions(-) diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index 215a3684d0..4f29b7bbd3 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -27,7 +27,6 @@ from sagemaker.local import LocalSession, LocalSagemakerRuntimeClient, LocalSagemakerClient from sagemaker.mxnet import MXNet -from sagemaker.tensorflow import TensorFlow # endpoint tests all use the same port, so we use this lock to prevent concurrent execution LOCK_PATH = os.path.join(tempfile.gettempdir(), "sagemaker_test_local_mode_lock") @@ -84,174 +83,6 @@ def _create_model(output_path): return _create_model -@pytest.mark.local_mode -@pytest.mark.skipif(PYTHON_VERSION != "py2", reason="TensorFlow image supports only python 2.") -def test_tf_local_mode(sagemaker_local_session): - with stopit.ThreadingTimeout(5 * 60, swallow_exc=False): - script_path = os.path.join(DATA_DIR, "iris", "iris-dnn-classifier.py") - - estimator = TensorFlow( - entry_point=script_path, - role="SageMakerRole", - framework_version="1.12", - training_steps=1, - evaluation_steps=1, - hyperparameters={"input_tensor_name": "inputs"}, - train_instance_count=1, - train_instance_type="local", - base_job_name="test-tf", - sagemaker_session=sagemaker_local_session, - ) - - inputs = estimator.sagemaker_session.upload_data( - path=DATA_PATH, key_prefix="integ-test-data/tf_iris" - ) - estimator.fit(inputs) - print("job succeeded: {}".format(estimator.latest_training_job.name)) - - endpoint_name = estimator.latest_training_job.name - with lock.lock(LOCK_PATH): - try: - json_predictor = estimator.deploy( - initial_instance_count=1, instance_type="local", endpoint_name=endpoint_name - ) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({"inputs": features}) - print("predict result: {}".format(dict_result)) - list_result = json_predictor.predict(features) - print("predict result: {}".format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - - -@pytest.mark.local_mode -@pytest.mark.skipif(PYTHON_VERSION != "py2", reason="TensorFlow image supports only python 2.") -def test_tf_distributed_local_mode(sagemaker_local_session): - with stopit.ThreadingTimeout(5 * 60, swallow_exc=False): - script_path = os.path.join(DATA_DIR, "iris", "iris-dnn-classifier.py") - - estimator = TensorFlow( - entry_point=script_path, - role="SageMakerRole", - framework_version="1.12", - training_steps=1, - evaluation_steps=1, - hyperparameters={"input_tensor_name": "inputs"}, - train_instance_count=3, - train_instance_type="local", - base_job_name="test-tf", - sagemaker_session=sagemaker_local_session, - ) - - inputs = "file://" + DATA_PATH - estimator.fit(inputs) - print("job succeeded: {}".format(estimator.latest_training_job.name)) - - endpoint_name = estimator.latest_training_job.name - - with lock.lock(LOCK_PATH): - try: - json_predictor = estimator.deploy( - initial_instance_count=1, instance_type="local", endpoint_name=endpoint_name - ) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({"inputs": features}) - print("predict result: {}".format(dict_result)) - list_result = json_predictor.predict(features) - print("predict result: {}".format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - - -@pytest.mark.local_mode -@pytest.mark.skipif(PYTHON_VERSION != "py2", reason="TensorFlow image supports only python 2.") -def test_tf_local_data(sagemaker_local_session): - with stopit.ThreadingTimeout(5 * 60, swallow_exc=False): - script_path = os.path.join(DATA_DIR, "iris", "iris-dnn-classifier.py") - - estimator = TensorFlow( - entry_point=script_path, - role="SageMakerRole", - framework_version="1.12", - training_steps=1, - evaluation_steps=1, - hyperparameters={"input_tensor_name": "inputs"}, - train_instance_count=1, - train_instance_type="local", - base_job_name="test-tf", - sagemaker_session=sagemaker_local_session, - ) - - inputs = "file://" + DATA_PATH - estimator.fit(inputs) - print("job succeeded: {}".format(estimator.latest_training_job.name)) - - endpoint_name = estimator.latest_training_job.name - with lock.lock(LOCK_PATH): - try: - json_predictor = estimator.deploy( - initial_instance_count=1, instance_type="local", endpoint_name=endpoint_name - ) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({"inputs": features}) - print("predict result: {}".format(dict_result)) - list_result = json_predictor.predict(features) - print("predict result: {}".format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - - -@pytest.mark.local_mode -@pytest.mark.skipif(PYTHON_VERSION != "py2", reason="TensorFlow image supports only python 2.") -def test_tf_local_data_local_script(): - with stopit.ThreadingTimeout(5 * 60, swallow_exc=False): - script_path = os.path.join(DATA_DIR, "iris", "iris-dnn-classifier.py") - - estimator = TensorFlow( - entry_point=script_path, - role="SageMakerRole", - framework_version="1.12", - training_steps=1, - evaluation_steps=1, - hyperparameters={"input_tensor_name": "inputs"}, - train_instance_count=1, - train_instance_type="local", - base_job_name="test-tf", - sagemaker_session=LocalNoS3Session(), - ) - - inputs = "file://" + DATA_PATH - - estimator.fit(inputs) - print("job succeeded: {}".format(estimator.latest_training_job.name)) - - endpoint_name = estimator.latest_training_job.name - with lock.lock(LOCK_PATH): - try: - json_predictor = estimator.deploy( - initial_instance_count=1, instance_type="local", endpoint_name=endpoint_name - ) - - features = [6.4, 3.2, 4.5, 1.5] - dict_result = json_predictor.predict({"inputs": features}) - print("predict result: {}".format(dict_result)) - list_result = json_predictor.predict(features) - print("predict result: {}".format(list_result)) - - assert dict_result == list_result - finally: - estimator.delete_endpoint() - - @pytest.mark.local_mode def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model, mxnet_full_version): path = "s3://%s" % sagemaker_local_session.default_bucket() @@ -320,6 +151,32 @@ def test_mxnet_local_mode(sagemaker_local_session, mxnet_full_version): mx.delete_endpoint() +@pytest.mark.local_mode +def test_mxnet_distributed_local_mode(sagemaker_local_session, mxnet_full_version): + script_path = os.path.join(DATA_DIR, "mxnet_mnist", "mnist.py") + data_path = os.path.join(DATA_DIR, "mxnet_mnist") + + mx = MXNet( + entry_point=script_path, + role="SageMakerRole", + py_version=PYTHON_VERSION, + train_instance_count=2, + train_instance_type="local", + sagemaker_session=sagemaker_local_session, + framework_version=mxnet_full_version, + distributions={"parameter_server": {"enabled": True}}, + ) + + train_input = mx.sagemaker_session.upload_data( + path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train" + ) + test_input = mx.sagemaker_session.upload_data( + path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test" + ) + + mx.fit({"train": train_input, "test": test_input}) + + @pytest.mark.local_mode def test_mxnet_local_data_local_script(mxnet_full_version): data_path = os.path.join(DATA_DIR, "mxnet_mnist")