Skip to content

Commit 8a61a7c

Browse files
mvsuspuditbhatia
authored andcommitted
Support of Horovod and TF 1.12 for TensorFlow Script Mode. TFS 1.12 support (#567)
* Support of Horovod and TF 1.12 for TensorFlow Script Mode
1 parent f341864 commit 8a61a7c

File tree

10 files changed

+260
-23
lines changed

10 files changed

+260
-23
lines changed

CHANGELOG.rst

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
CHANGELOG
33
=========
44

5-
1.17.3.dev
6-
==========
5+
6+
1.17.3
7+
======
78

89
* bug-fix: Handle StopIteration in CloudWatch Logs retrieval
910
* feature: Update EI TensorFlow latest version to 1.12
11+
* feature: Support for Horovod
1012

1113
1.17.2
1214
======
@@ -19,10 +21,15 @@ CHANGELOG
1921
* enhancement: Workflow: Specify tasks from which training/tuning operator to transform/deploy in related operators
2022
* feature: Supporting inter-container traffic encryption flag
2123

24+
2225
1.17.0
2326
======
2427

28+
2529
* bug-fix: Workflow: Revert appending Airflow retry id to default job name
30+
* feature: support for Tensorflow 1.12
31+
* feature: support for Tensorflow Serving 1.12
32+
* bug-fix: Revert appending Airflow retry id to default job name
2633
* bug-fix: Session: don't allow get_execution_role() to return an ARN that's not a role but has "role" in the name
2734
* bug-fix: Remove ``__all__`` from ``__init__.py`` files
2835
* doc-fix: Add TFRecord split type to docs
@@ -35,6 +42,7 @@ CHANGELOG
3542
* enhancement: Add Model.transformer()
3643
* bug-fix: HyperparameterTuner: make ``include_cls_metadata`` default to ``False`` for everything except Frameworks
3744

45+
3846
1.16.3
3947
======
4048

src/sagemaker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@
3939
from sagemaker.session import s3_input # noqa: F401
4040
from sagemaker.session import get_execution_role # noqa: F401
4141

42-
__version__ = '1.17.2'
42+
__version__ = '1.17.3'

src/sagemaker/estimator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,9 @@ class Framework(EstimatorBase):
749749

750750
__framework_name__ = None
751751
LAUNCH_PS_ENV_NAME = 'sagemaker_parameter_server_enabled'
752+
LAUNCH_MPI_ENV_NAME = 'sagemaker_mpi_enabled'
753+
MPI_NUM_PROCESSES_PER_HOST = 'sagemaker_mpi_num_of_processes_per_host'
754+
MPI_CUSTOM_MPI_OPTIONS = 'sagemaker_mpi_custom_mpi_options'
752755

753756
def __init__(self, entry_point, source_dir=None, hyperparameters=None, enable_cloudwatch_metrics=False,
754757
container_log_level=logging.INFO, code_location=None, image_name=None, dependencies=None, **kwargs):

src/sagemaker/tensorflow/README.rst

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,67 @@ After attaching, the estimator can be deployed as usual.
207207
208208
tf_estimator = TensorFlow.attach(training_job_name=training_job_name)
209209
210+
Distributed Training
211+
''''''''''''''''''''
212+
213+
To run your training job with multiple instances in a distributed fashion, set ``train_instance_count``
214+
to a number larger than 1. We support two different types of distributed training, parameter server and Horovod.
215+
The ``distributions`` parameter is used to configure which distributed training strategy to use.
216+
217+
Training with parameter servers
218+
"""""""""""""""""""""""""""""""
219+
220+
If you specify parameter_server as the value of the distributions parameter, the container launches a parameter server
221+
thread on each instance in the training cluster, and then executes your training code. You can find more information on
222+
TensorFlow distributed training at `TensorFlow docs <https://www.tensorflow.org/deploy/distributed>`__.
223+
To enable parameter server training:
224+
225+
.. code:: python
226+
227+
from sagemaker.tensorflow import TensorFlow
228+
229+
tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
230+
train_instance_count=2, train_instance_type='ml.p2.xlarge',
231+
framework_version='1.11', py_version='py3',
232+
distributions={'parameter_server': {'enabled': True}})
233+
tf_estimator.fit('s3://bucket/path/to/training/data')
234+
235+
Training with Horovod
236+
"""""""""""""""""""""
237+
238+
Horovod is a distributed training framework based on MPI. You can find more details at `Horovod README <https://github.com/uber/horovod>`__.
239+
240+
The container sets up the MPI environment and executes the ``mpirun`` command enabling you to run any Horovod
241+
training script with Script Mode.
242+
243+
Training with ``MPI`` is configured by specifying following fields in ``distributions``:
244+
245+
- ``enabled (bool)``: If set to ``True``, the MPI setup is performed and ``mpirun`` command is executed.
246+
- ``processes_per_host (int)``: Number of processes MPI should launch on each host. Note, this should not be
247+
greater than the available slots on the selected instance type. This flag should be set for the multi-cpu/gpu
248+
training.
249+
- ``custom_mpi_options (str)``: Any `mpirun` flag(s) can be passed in this field that will be added to the `mpirun`
250+
command executed by SageMaker to launch distributed horovod training.
251+
252+
253+
In the below example we create an estimator to launch Horovod distributed training with 2 processes on one host:
254+
255+
.. code:: python
256+
257+
from sagemaker.tensorflow import TensorFlow
258+
259+
tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
260+
train_instance_count=1, train_instance_type='ml.p2.xlarge',
261+
framework_version='1.12', py_version='py3',
262+
distributions={
263+
'mpi': {
264+
'enabled': True,
265+
'processes_per_host': 2,
266+
'custom_mpi_options': '--NCCL_DEBUG INFO'
267+
}
268+
})
269+
tf_estimator.fit('s3://bucket/path/to/training/data')
270+
210271
sagemaker.tensorflow.TensorFlow class
211272
'''''''''''''''''''''''''''''''''''''
212273

@@ -277,11 +338,10 @@ Optional:
277338
- ``model_dir (str)`` Location where model data, checkpoint data, and TensorBoard checkpoints should be saved during training.
278339
If not specified a S3 location will be generated under the training job's default bucket. And ``model_dir`` will be
279340
passed in your training script as one of the command line arguments.
280-
- ``distributions (dict)`` Configure your distrubtion strategy with this argument. For launching parameter server for
281-
for distributed training, you must set ``distributions`` to ``{'parameter_server': {'enabled': True}}``
341+
- ``distributions (dict)`` Configure your distribution strategy with this argument.
282342

283343
Training with Pipe Mode using PipeModeDataset
284-
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
344+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
285345

286346
Amazon SageMaker allows users to create training jobs using Pipe input mode.
287347
With Pipe input mode, your dataset is streamed directly to your training instances instead of being downloaded first.
@@ -327,9 +387,9 @@ To run training job with Pipe input mode, pass in ``input_mode='Pipe'`` to your
327387
from sagemaker.tensorflow import TensorFlow
328388
329389
tf_estimator = TensorFlow(entry_point='tf-train-with-pipemodedataset.py', role='SageMakerRole',
330-
training_steps=10000, evaluation_steps=100,
331-
train_instance_count=1, train_instance_type='ml.p2.xlarge',
332-
framework_version='1.10.0', input_mode='Pipe')
390+
training_steps=10000, evaluation_steps=100,
391+
train_instance_count=1, train_instance_type='ml.p2.xlarge',
392+
framework_version='1.10.0', input_mode='Pipe')
333393
334394
tf_estimator.fit('s3://bucket/path/to/training/data')
335395

src/sagemaker/tensorflow/estimator.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,20 @@ def __init__(self, training_steps=None, evaluation_steps=None, checkpoint_path=N
201201
script_mode (bool): If set to True will the estimator will use the Script Mode containers (default: False).
202202
This will be ignored if py_version is set to 'py3'.
203203
distributions (dict): A dictionary with information on how to run distributed training
204-
(default: None). Currently we only support distributed training with parameter servers. To enable it
205-
use the following setup:
206-
{
204+
(default: None). Currently we support distributed training with parameter servers and MPI. To enable
205+
parameter server use the following setup:
207206
'parameter_server':
208207
{
209208
'enabled': True
210209
}
211210
}
211+
To enable MPI:
212+
{
213+
'mpi':
214+
{
215+
'enabled': True
216+
}
217+
}
212218
**kwargs: Additional kwargs passed to the Framework constructor.
213219
"""
214220
if framework_version is None:
@@ -420,13 +426,24 @@ def hyperparameters(self):
420426
hyperparameters = super(TensorFlow, self).hyperparameters()
421427

422428
self.checkpoint_path = self.checkpoint_path or self._default_s3_path('checkpoints')
429+
mpi_enabled = False
423430

424431
if self._script_mode_enabled():
425-
self.model_dir = self.model_dir or self._default_s3_path('model')
426-
additional_hyperparameters = {'model_dir': self.model_dir}
432+
additional_hyperparameters = {}
433+
427434
if 'parameter_server' in self.distributions:
428-
enabled = self.distributions['parameter_server'].get('enabled', False)
429-
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = enabled
435+
ps_enabled = self.distributions['parameter_server'].get('enabled', False)
436+
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = ps_enabled
437+
438+
if 'mpi' in self.distributions:
439+
mpi_dict = self.distributions['mpi']
440+
mpi_enabled = mpi_dict.get('enabled', False)
441+
additional_hyperparameters[self.LAUNCH_MPI_ENV_NAME] = mpi_enabled
442+
additional_hyperparameters[self.MPI_NUM_PROCESSES_PER_HOST] = mpi_dict.get('processes_per_host', 1)
443+
additional_hyperparameters[self.MPI_CUSTOM_MPI_OPTIONS] = mpi_dict.get('custom_mpi_options', '')
444+
445+
self.model_dir = self.model_dir or self._default_s3_path('model', mpi=mpi_enabled)
446+
additional_hyperparameters['model_dir'] = self.model_dir
430447
else:
431448
additional_hyperparameters = {'checkpoint_path': self.checkpoint_path,
432449
'training_steps': self.training_steps,
@@ -436,10 +453,12 @@ def hyperparameters(self):
436453
hyperparameters.update(Framework._json_encode_hyperparameters(additional_hyperparameters))
437454
return hyperparameters
438455

439-
def _default_s3_path(self, directory):
456+
def _default_s3_path(self, directory, mpi=False):
440457
local_code = get_config_value('local.local_code', self.sagemaker_session.config)
441458
if self.sagemaker_session.local_mode and local_code:
442459
return '/opt/ml/shared/{}'.format(directory)
460+
elif mpi:
461+
return '/opt/ml/model'
443462
else:
444463
return os.path.join(self.output_path, self._current_job_name, directory)
445464

tests/data/horovod/launcher.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/usr/bin/env bash
2+
3+
python benchmarks/scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py --num_batches=500 --model vgg16 --variable_update horovod --horovod_device gpu --use_fp16 --summary_verbosity 1 --save_summaries_steps 10 --train_dir /opt/ml/model --eval_dir /opt/ml/model --batch_size 32

tests/data/horovod/test_hvd_basic.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import json
2+
import os
3+
import horovod.tensorflow as hvd
4+
5+
hvd.init()
6+
7+
with open(os.path.join('/opt/ml/model/rank-%s' % hvd.rank()), 'w+') as f:
8+
basic_info = {'rank': hvd.rank(), 'size': hvd.size()}
9+
10+
print(basic_info)
11+
json.dump(basic_info, f)

tests/integ/test_horovod.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
import json
16+
import os
17+
import tarfile
18+
from six.moves.urllib.parse import urlparse
19+
20+
import boto3
21+
import pytest
22+
23+
import tests.integ as integ
24+
from sagemaker.tensorflow import TensorFlow
25+
from tests.integ import timeout
26+
27+
horovod_dir = os.path.join(os.path.dirname(__file__), '..', 'data', 'horovod')
28+
29+
30+
@pytest.mark.parametrize('instance_type', ['ml.c5.xlarge', 'ml.p3.2xlarge'])
31+
def test_horovod(sagemaker_session, instance_type, tmpdir):
32+
33+
estimator = TensorFlow(entry_point=os.path.join(horovod_dir, 'test_hvd_basic.py'),
34+
role='SageMakerRole',
35+
train_instance_count=2,
36+
train_instance_type=instance_type,
37+
sagemaker_session=sagemaker_session,
38+
py_version=integ.PYTHON_VERSION,
39+
script_mode=True,
40+
framework_version='1.12',
41+
distributions={'mpi': {'enabled': True}},
42+
base_job_name='test-tf-horovod')
43+
44+
with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
45+
estimator.fit()
46+
47+
tmp = str(tmpdir)
48+
extract_files_from_s3(estimator.model_data, tmp)
49+
50+
for rank in range(2):
51+
assert read_json('rank-%s' % rank, tmp)['rank'] == rank
52+
53+
54+
@pytest.mark.parametrize('instances, processes', [
55+
[1, 2],
56+
(2, 1),
57+
(2, 2)])
58+
def test_horovod_local_mode(instances, processes, tmpdir):
59+
output_path = 'file://%s' % tmpdir
60+
61+
estimator = TensorFlow(entry_point=os.path.join(horovod_dir, 'test_hvd_basic.py'),
62+
role='SageMakerRole',
63+
train_instance_count=2,
64+
train_instance_type='local',
65+
py_version=integ.PYTHON_VERSION,
66+
script_mode=True,
67+
output_path=output_path,
68+
framework_version='1.12',
69+
distributions={'mpi': {'enabled': True,
70+
'processes_per_host': processes}},
71+
base_job_name='test-tf-horovod')
72+
73+
with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
74+
estimator.fit()
75+
76+
tmp = str(tmpdir)
77+
extract_files(output_path.replace('file://', ''), tmp)
78+
79+
size = instances * processes
80+
81+
for rank in range(size):
82+
assert read_json('rank-%s' % rank, tmp)['rank'] == rank
83+
84+
85+
def extract_files(output_path, tmpdir):
86+
with tarfile.open(os.path.join(output_path, 'model.tar.gz')) as tar:
87+
tar.extractall(tmpdir)
88+
89+
90+
def read_json(file, tmp):
91+
with open(os.path.join(tmp, file)) as f:
92+
return json.load(f)
93+
94+
95+
def extract_files_from_s3(s3_url, tmpdir):
96+
parsed_url = urlparse(s3_url)
97+
s3 = boto3.resource('s3')
98+
99+
model = os.path.join(tmpdir, 'model')
100+
s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip('/'), model)
101+
102+
with tarfile.open(model, 'r') as tar_file:
103+
tar_file.extractall(tmpdir)

tests/integ/test_tf_script_mode.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323

2424
RESOURCE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'tensorflow_mnist')
2525
SCRIPT = os.path.join(RESOURCE_PATH, 'mnist.py')
26-
DISTRIBUTION_ENABLED = {'parameter_server': {'enabled': True}}
26+
PARAMETER_SERVER_DISTRIBUTION = {'parameter_server': {'enabled': True}}
27+
MPI_DISTRIBUTION = {'mpi': {'enabled': True}}
2728

2829

2930
@pytest.fixture(scope='session', params=['ml.c5.xlarge', 'ml.p2.xlarge'])
@@ -62,7 +63,7 @@ def test_mnist_distributed(sagemaker_session, instance_type):
6263
py_version=integ.PYTHON_VERSION,
6364
script_mode=True,
6465
framework_version='1.11',
65-
distributions=DISTRIBUTION_ENABLED,
66+
distributions=PARAMETER_SERVER_DISTRIBUTION,
6667
base_job_name='test-tf-sm-mnist')
6768
inputs = estimator.sagemaker_session.upload_data(
6869
path=os.path.join(RESOURCE_PATH, 'data'),

0 commit comments

Comments
 (0)