diff --git a/doc/api/training/sdp_versions/latest.rst b/doc/api/training/sdp_versions/latest.rst index ec8c3d7adf..25a3605d07 100644 --- a/doc/api/training/sdp_versions/latest.rst +++ b/doc/api/training/sdp_versions/latest.rst @@ -1,5 +1,5 @@ -Version 1.2.0 (Latest) +Version 1.1.2 (Latest) ====================== .. toctree:: diff --git a/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst b/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst index 52de6223d7..44d33c5f31 100644 --- a/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst +++ b/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst @@ -155,7 +155,7 @@ PyTorch API .. rubric:: Supported versions -**PyTorch 1.7.1, 1.8.1** +**PyTorch 1.7.1, 1.8.0** .. function:: smdistributed.dataparallel.torch.distributed.is_available() diff --git a/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst b/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst index e5ea3f2106..e4aa1c1521 100644 --- a/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst +++ b/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst @@ -157,7 +157,10 @@ TensorFlow API .. rubric:: Supported versions -**TensorFlow 2.4.1** +TensorFlow is supported in version 1.0.0 of ``sagemakerdistributed.dataparallel``. +Reference version 1.0.0 `TensorFlow API documentation +`_ +for supported TensorFlow versions. .. function:: smdistributed.dataparallel.tensorflow.init() diff --git a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst deleted file mode 100644 index b8216f6a72..0000000000 --- a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst +++ /dev/null @@ -1,533 +0,0 @@ -############################################################## -PyTorch Guide to SageMaker's distributed data parallel library -############################################################## - -.. admonition:: Contents - - - :ref:`pytorch-sdp-modify` - - :ref:`pytorch-sdp-api` - -.. _pytorch-sdp-modify: - :noindex: - -Modify a PyTorch training script to use SageMaker data parallel -====================================================================== - -The following steps show you how to convert a PyTorch training script to -utilize SageMaker's distributed data parallel library. - -The distributed data parallel library APIs are designed to be close to PyTorch Distributed Data -Parallel (DDP) APIs. -See `SageMaker distributed data parallel PyTorch examples `__ for additional details on how to implement the data parallel library -API offered for PyTorch. - - -- First import the distributed data parallel library’s PyTorch client and initialize it. You also import - the distributed data parallel library module for distributed training. - - .. code:: python - - import smdistributed.dataparallel.torch.distributed as dist - - from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP - - dist.init_process_group() - - -- Pin each GPU to a single distributed data parallel library process with ``local_rank`` - this - refers to the relative rank of the process within a given node. - ``smdistributed.dataparallel.torch.get_local_rank()`` API provides - you the local rank of the device. The leader node will be rank 0, and - the worker nodes will be rank 1, 2, 3, and so on. This is invoked in - the next code block as ``dist.get_local_rank()``. - - .. code:: python - - torch.cuda.set_device(dist.get_local_rank()) - - -- Then wrap the PyTorch model with the distributed data parallel library’s DDP. - - .. code:: python - - model = ... - # Wrap model with SageMaker's DistributedDataParallel - model = DDP(model) - - -- Modify the ``torch.utils.data.distributed.DistributedSampler`` to - include the cluster’s information. Set``num_replicas`` to the - total number of GPUs participating in training across all the nodes - in the cluster. This is called ``world_size``. You can get - ``world_size`` with - ``smdistributed.dataparallel.torch.get_world_size()`` API. This is - invoked in the following code as ``dist.get_world_size()``. Also - supply the node rank using - ``smdistributed.dataparallel.torch.get_rank()``. This is invoked as - ``dist.get_rank()``. - - .. code:: python - - train_sampler = DistributedSampler(train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank()) - - -- Finally, modify your script to save checkpoints only on the leader - node. The leader node will have a synchronized model. This also - avoids worker nodes overwriting the checkpoints and possibly - corrupting the checkpoints. - -.. code:: python - - if dist.get_rank() == 0: - torch.save(...) - - -All put together, the following is an example PyTorch training script -you will have for distributed training with the distributed data parallel library: - -.. code:: python - - # Import distributed data parallel library PyTorch API - import smdistributed.dataparallel.torch.distributed as dist - - # Import distributed data parallel library PyTorch DDP - from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP - - # Initialize distributed data parallel library - dist.init_process_group() - - class Net(nn.Module): -     ... -     # Define model - - def train(...): -     ... -     # Model training - - def test(...): -     ... -     # Model evaluation - - def main(): - -     # Scale batch size by world size -     batch_size //= dist.get_world_size() // 8 -     batch_size = max(batch_size, 1) - -     # Prepare dataset -     train_dataset = torchvision.datasets.MNIST(...) - -     # Set num_replicas and rank in DistributedSampler -     train_sampler = torch.utils.data.distributed.DistributedSampler( -             train_dataset, -             num_replicas=dist.get_world_size(), -             rank=dist.get_rank()) - -     train_loader = torch.utils.data.DataLoader(..) - -     # Wrap the PyTorch model with distributed data parallel library’s DDP -     model = DDP(Net().to(device)) - -     # Pin each GPU to a single distributed data parallel library process. -     torch.cuda.set_device(local_rank) -     model.cuda(local_rank) - -     # Train -     optimizer = optim.Adadelta(...) -     scheduler = StepLR(...) -     for epoch in range(1, args.epochs + 1): -         train(...) -         if rank == 0: -             test(...) -         scheduler.step() - -     # Save model on master node. -     if dist.get_rank() == 0: -         torch.save(...) - - if __name__ == '__main__': -     main() - - -.. _pytorch-sdp-api-1.1.x: - :noindex: - -PyTorch API -=========== - -.. rubric:: Supported versions - -**PyTorch 1.7.1, 1.8.0** - - -.. function:: smdistributed.dataparallel.torch.distributed.is_available() - :noindex: - - Check if script started as a distributed job. For local runs user can - check that is_available returns False and run the training script - without calls to ``smdistributed.dataparallel``. - - **Inputs:** - - - ``None`` - - **Returns:** - - - ``True`` if started as a distributed job, ``False`` otherwise - - -.. function:: smdistributed.dataparallel.torch.distributed.init_process_group(*args, **kwargs) - :noindex: - - Initialize ``smdistributed.dataparallel``. Must be called at the - beginning of the training script, before calling any other methods. - ​ - Process group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with ``torch.distributed`` only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - ​ - After this - call, ``smdistributed.dataparallel.torch.distributed.is_initialized()`` will - return ``True``. - ​ - - **Inputs:** - - - ``None`` - - **Returns:** - - - ``None`` - - -.. function:: smdistributed.dataparallel.torch.distributed.is_initialized() - :noindex: - - Checks if the default process group has been initialized. - - **Inputs:** - - - ``None`` - - **Returns:** - - - ``True`` if initialized, else ``False``. - - -.. function:: smdistributed.dataparallel.torch.distributed.get_world_size(group=smdistributed.dataparallel.torch.distributed.group.WORLD) - :noindex: - - The total number of GPUs across all the nodes in the cluster. For - example, in a 8 node cluster with 8 GPU each, size will be equal to 64. - - **Inputs:** - - - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - **Returns:** - - - An integer scalar containing the total number of GPUs in the training - job, across all nodes in the cluster. - - -.. function:: smdistributed.dataparallel.torch.distributed.get_rank(group=smdistributed.dataparallel.torch.distributed.group.WORLD) - :noindex: - - The rank of the node in the cluster. The rank ranges from 0 to number of - nodes - 1. This is similar to MPI's World Rank. - - - **Inputs:** - - - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - **Returns:** - - - An integer scalar containing the rank of the worker node. - - -.. function:: smdistributed.dataparallel.torch.distributed.get_local_rank() - :noindex: - - Local rank refers to the relative rank of - the ``smdistributed.dataparallel`` process within the node the current - process is running on. For example, if a node contains 8 GPUs, it has - 8 ``smdistributed.dataparallel`` processes. Each process has - a ``local_rank`` ranging from 0 to 7. - - **Inputs:** - - - ``None`` - - **Returns:** - - - An integer scalar containing the rank of the GPU and - its ``smdistributed.dataparallel`` process. - - -.. function:: smdistributed.dataparallel.torch.distributed.all_reduce(tensor, op=smdistributed.dataparallel.torch.distributed.ReduceOp.SUM, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) - :noindex: - - Performs an all-reduce operation on a tensor (torch.tensor) across - all ``smdistributed.dataparallel`` workers - - ``smdistributed.dataparallel`` AllReduce API can be used for all - reducing gradient tensors or any other tensors.  By - default, ``smdistributed.dataparallel`` AllReduce reduces the tensor - data across all ``smdistributed.dataparallel`` workers in such a way - that all get the final result. - - After the call ``tensor`` is going to be bitwise identical in all - processes. - - **Inputs:** - - - ``tensor (torch.tensor) (required):`` Input and output of the collective. The function operates in-place. - - - ``op (smdistributed.dataparallel.torch.distributed.ReduceOp) (optional)``: The reduction operation to combine tensors across different ranks.  Defaults to ``SUM`` if None is given. - - * Supported ops: ``AVERAGE``, ``SUM``, ``MIN``, ``MAX`` - - - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process group is not supported in ``smdistributed.dataparallel``. This parameter exists for API parity with torch.distributed only. - - * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - - ``async_op (bool) (optional):`` Whether this op should be an async op. Defaults to ``False``. - - **Returns:** - - - Async op work handle, if async_op is set to True. ``None``, - otherwise. - - .. rubric:: Notes - - ``smdistributed.dataparallel.torch.distributed.allreduce``, in most - cases, is ~2X slower than all-reducing - with ``smdistributed.dataparallel.torch.parallel.distributed.DistributedDataParallel`` and - hence, it is not recommended to be used for performing gradient - reduction during the training - process. ``smdistributed.dataparallel.torch.distributed.allreduce`` internally - uses NCCL AllReduce with ``ncclSum`` as the reduction operation. - - -.. function:: smdistributed.dataparallel.torch.distributed.broadcast(tensor, src=0, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) - :noindex: - - Broadcasts the tensor (torch.tensor) to the whole group. - - ``tensor`` must have the same number of elements as GPUs in the - cluster. - - **Inputs:** - - - ``tensor (torch.tensor)(required)`` - - - ``src (int)(optional)`` - - - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process group is not supported in ``smdistributed.dataparallel``. This parameter exists for API parity with ``torch.distributed`` only. - - * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - - ``async_op (bool)(optional):`` Whether this op should be an async op. Defaults to ``False``. - - **Returns:** - - - Async op work handle, if async_op is set to True. ``None``, otherwise. - - -.. function:: smdistributed.dataparallel.torch.distributed.all_gather(tensor_list, tensor, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) - :noindex: - - Gathers tensors from the whole group in a list. - - - **Inputs:** - - - ``tensor_list (list[torch.tensor])(required):`` Output list. It - should contain correctly-sized tensors to be used for output of the - collective. - - ``tensor (torch.tensor)(required):`` Tensor to be broadcast from - current process. - - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - ``async_op (bool)(optional):`` Whether this op should be an async op. - Defaults to ``False``. - - **Returns:** - - - Async op work handle, if async_op is set to True. ``None``, - otherwise. - - -.. function:: smdistributed.dataparallel.torch.distributed.all_to_all_single(output_t, input_t, output_split_sizes=None, input_split_sizes=None, group=group.WORLD, async_op=False) - :noindex: - - Each process scatters input tensor to all processes in a group and return gathered tensor in output. - - **Inputs:** - - - output_t - - input_t - - output_split_sizes - - input_split_sizes - - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - ``async_op (bool)(optional):`` Whether this op should be an async op. - Defaults to ``False``. - - **Returns:** - - - Async op work handle, if async_op is set to True. ``None``, - otherwise. - - -.. function:: smdistributed.dataparallel.torch.distributed.barrier(group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) - :noindex: - - Synchronizes all ``smdistributed.dataparallel`` processes. - - **Inputs:** - - - tensor (torch.tensor)(required): Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise. - - - src (int)(optional): Source rank. - - - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. - - * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` - - - ``async_op (bool)(optional):`` Whether this op should be an async op. - Defaults to ``False``. - - **Returns:** - - - Async op work handle, if async_op is set to True. ``None``, - otherwise. - - -.. class:: smdistributed.dataparallel.torch.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, broadcast_buffers=True, process_group=None, bucket_cap_mb=None) - :noindex: - - ``smdistributed.dataparallel's`` implementation of distributed data - parallelism for PyTorch. In most cases, wrapping your PyTorch Module - with ``smdistributed.dataparallel's`` ``DistributedDataParallel (DDP)`` is - all you need to do to use ``smdistributed.dataparallel``. - - Creation of this DDP class requires ``smdistributed.dataparallel`` - already initialized - with ``smdistributed.dataparallel.torch.distributed.init_process_group()``. - - This container parallelizes the application of the given module by - splitting the input across the specified devices by chunking in the - batch dimension. The module is replicated on each machine and each - device, and each such replica handles a portion of the input. During the - backwards pass, gradients from each node are averaged. - - The batch size should be larger than the number of GPUs used locally. - ​ - Example usage - of ``smdistributed.dataparallel.torch.parallel.DistributedDataParallel``: - - .. code:: python - - import torch - import smdistributed.dataparallel.torch.distributed as dist - from smdistributed.dataparallel.torch.parallel import DistributedDataParallel as DDP - - dist.init_process_group() - - # Pin GPU to be used to process local rank (one GPU per process) - torch.cuda.set_device(dist.get_local_rank()) - - # Build model and optimizer - model = ... - optimizer = torch.optim.SGD(model.parameters(), -                             lr=1e-3 * dist.get_world_size()) - # Wrap model with smdistributed.dataparallel's DistributedDataParallel - model = DDP(model) - - **Parameters:** - - - ``module (torch.nn.Module)(required):`` PyTorch NN Module to be - parallelized - - ``device_ids (list[int])(optional):`` CUDA devices. This should only - be provided when the input module resides on a single CUDA device. - For single-device modules, - the ``ith module replica is placed on device_ids[i]``. For - multi-device modules and CPU modules, device_ids must be None or an - empty list, and input data for the forward pass must be placed on the - correct device. Defaults to ``None``. - - ``output_device (int)(optional):`` Device location of output for - single-device CUDA modules. For multi-device modules and CPU modules, - it must be None, and the module itself dictates the output location. - (default: device_ids[0] for single-device modules).  Defaults - to ``None``. - - ``broadcast_buffers (bool)(optional):`` Flag that enables syncing - (broadcasting) buffers of the module at beginning of the forward - function. ``smdistributed.dataparallel`` does not support broadcast - buffer yet. Please set this to ``False``. - - ``process_group(smdistributed.dataparallel.torch.distributed.group)(optional):`` Process - group is not supported in ``smdistributed.dataparallel``. This - parameter exists for API parity with torch.distributed only. Only - supported value is - ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` Defaults - to ``None.`` - - ``bucket_cap_mb (int)(optional):`` DistributedDataParallel will - bucket parameters into multiple buckets so that gradient reduction of - each bucket can potentially overlap with backward - computation. ``bucket_cap_mb`` controls the bucket size in - MegaBytes (MB) (default: 25). - - .. rubric:: Notes - - - This module assumes all parameters are registered in the model by the - time it is created. No parameters should be added nor removed later. - - This module assumes all parameters are registered in the model of - each distributed processes are in the same order. The module itself - will conduct gradient all-reduction following the reverse order of - the registered parameters of the model. In other words, it is users’ - responsibility to ensure that each distributed process has the exact - same model and thus the exact same parameter registration order. - - You should never change the set of your model’s parameters after - wrapping up your model with DistributedDataParallel. In other words, - when wrapping up your model with DistributedDataParallel, the - constructor of DistributedDataParallel will register the additional - gradient reduction functions on all the parameters of the model - itself at the time of construction. If you change the model’s - parameters after the DistributedDataParallel construction, this is - not supported and unexpected behaviors can happen, since some - parameters’ gradient reduction functions might not get called. - - -.. class:: smdistributed.dataparallel.torch.distributed.ReduceOp - :noindex: - - An enum-like class for supported reduction operations - in ``smdistributed.dataparallel``. - - The values of this class can be accessed as attributes, for - example, ``ReduceOp.SUM``. They are used in specifying strategies for - reduction collectives such as -  ``smdistributed.dataparallel.torch.distributed.all_reduce(...)``. - - - ``AVERAGE`` - - ``SUM`` - - ``MIN`` - - ``MAX`` diff --git a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst deleted file mode 100644 index d8b6fdf960..0000000000 --- a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst +++ /dev/null @@ -1,558 +0,0 @@ -################################################################# -TensorFlow Guide to SageMaker's distributed data parallel library -################################################################# - -.. admonition:: Contents - - - :ref:`tensorflow-sdp-modify` - - :ref:`tensorflow-sdp-api` - -.. _tensorflow-sdp-modify: - :noindex: - -Modify a TensorFlow 2.x training script to use SageMaker data parallel -====================================================================== - -The following steps show you how to convert a TensorFlow 2.x training -script to utilize the distributed data parallel library. - -The distributed data parallel library APIs are designed to be close to Horovod APIs. -See `SageMaker distributed data parallel TensorFlow examples -`__ -for additional details on how to implement the data parallel library. - -- First import the distributed data parallel library’s TensorFlow client and initialize it: - - .. code:: python - - import smdistributed.dataparallel.tensorflow as sdp - sdp.init() - - -- Pin each GPU to a single smdistributed.dataparallel process - with ``local_rank`` - this refers to the relative rank of the - process within a given node. ``sdp.tensorflow.local_rank()`` API - provides you the local rank of the device. The leader node will be - rank 0, and the worker nodes will be rank 1, 2, 3, and so on. This is - invoked in the next code block as ``sdp.local_rank()``. - ``set_memory_growth`` is not directly related to SMD, but must be set - for distributed training with TensorFlow. - - .. code:: python - - gpus = tf.config.experimental.list_physical_devices('GPU') - for gpu in gpus: -     tf.config.experimental.set_memory_growth(gpu, True) - if gpus: -     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') - - -- Scale the learning rate by the number of workers. - ``sdp.tensorflow.size()`` API provides you number of workers in the - cluster. This is invoked in the next code block as ``sdp.size()``. - - .. code:: python - - learning_rate = learning_rate * sdp.size() - - -- Use the library’s ``DistributedGradientTape`` to optimize AllReduce - operations during training. This wraps ``tf.GradientTape``. - - .. code:: python - - with tf.GradientTape() as tape: -       output = model(input) -       loss_value = loss(label, output) - - # Wrap tf.GradientTape with the library's DistributedGradientTape - tape = sdp.DistributedGradientTape(tape) - - -- Broadcast initial model variables from the leader node (rank 0) to - all the worker nodes (ranks 1 through n). This is needed to ensure a - consistent initialization across all the worker ranks. For this, you - use ``sdp.tensorflow.broadcast_variables`` API after the - model and optimizer variables are initialized. This is invoked in the - next code block as ``sdp.broadcast_variables()``. - - .. code:: python - - sdp.broadcast_variables(model.variables, root_rank=0) - sdp.broadcast_variables(opt.variables(), root_rank=0) - - -- Finally, modify your script to save checkpoints only on the leader - node. The leader node will have a synchronized model. This also - avoids worker nodes overwriting the checkpoints and possibly - corrupting the checkpoints. - - .. code:: python - - if sdp.rank() == 0: -     checkpoint.save(checkpoint_dir) - - -All put together, the following is an example TensorFlow2 training -script you will have for distributed training with the library. - -.. code:: python - - import tensorflow as tf - - # Import the library's TF API - import smdistributed.dataparallel.tensorflow as sdp - - # Initialize the library - sdp.init() - - gpus = tf.config.experimental.list_physical_devices('GPU') - for gpu in gpus: -     tf.config.experimental.set_memory_growth(gpu, True) - if gpus: -     # Pin GPUs to a single process -     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') - - # Prepare Dataset - dataset = tf.data.Dataset.from_tensor_slices(...) - - # Define Model - mnist_model = tf.keras.Sequential(...) - loss = tf.losses.SparseCategoricalCrossentropy() - - # Scale Learning Rate - # LR for 8 node run : 0.000125 - # LR for single node run : 0.001 - opt = tf.optimizers.Adam(0.000125 * sdp.size()) - - @tf.function - def training_step(images, labels, first_batch): -     with tf.GradientTape() as tape: -         probs = mnist_model(images, training=True) -         loss_value = loss(labels, probs) - -     # Wrap tf.GradientTape with the library's DistributedGradientTape -     tape = sdp.DistributedGradientTape(tape) - -     grads = tape.gradient(loss_value, mnist_model.trainable_variables) -     opt.apply_gradients(zip(grads, mnist_model.trainable_variables)) - -     if first_batch: -        # Broadcast model and optimizer variables -        sdp.broadcast_variables(mnist_model.variables, root_rank=0) -        sdp.broadcast_variables(opt.variables(), root_rank=0) - -     return loss_value - - ... - - # Save checkpoints only from master node. - if sdp.rank() == 0: -     checkpoint.save(checkpoint_dir) - - -.. _tensorflow-sdp-api: - :noindex: - -TensorFlow API -============== - -.. rubric:: Supported versions - -TensorFlow is supported in version 1.0.0 of ``sagemakerdistributed.dataparallel``. -Reference version 1.0.0 `TensorFlow API documentation -`_ -for supported TensorFlow versions. - -.. function:: smdistributed.dataparallel.tensorflow.init() - :noindex: - - Initialize ``smdistributed.dataparallel``. Must be called at the - beginning of the training script. - - - **Inputs:** - - - ``None`` - - **Returns:** - - - ``None`` - - - .. rubric:: Notes - - ``init()`` needs to be called only once. It will throw an error if - called more than once: - - ``init() called more than once. smdistributed.dataparallel is already initialized.`` - - -.. function:: smdistributed.dataparallel.tensorflow.size() - :noindex: - - The total number of GPUs across all the nodes in the cluster. For - example, in a 8 node cluster with 8 GPUs each, ``size`` will be equal - to 64. - - - **Inputs:** - - - ``None`` - - **Returns:** - - - An integer scalar containing the total number of GPUs, across all - nodes in the cluster. - - -.. function:: smdistributed.dataparallel.tensorflow.local_size() - :noindex: - - The total number of GPUs on a node. For example, on a node with 8 - GPUs, ``local_size`` will be equal to 8. - - **Inputs:** - - - ``None`` - - **Returns:** - - - An integer scalar containing the total number of GPUs on itself. - - -.. function:: smdistributed.dataparallel.tensorflow.rank() - :noindex: - - The rank of the node in the cluster. The rank ranges from 0 to number of - nodes - 1. This is similar to MPI's World Rank. - - **Inputs:** - - - ``None`` - - **Returns:** - - - An integer scalar containing the rank of the node. - - -.. function:: smdistributed.dataparallel.tensorflow.local_rank() - :noindex: - - Local rank refers to the relative rank of the - GPUs’ ``smdistributed.dataparallel`` processes within the node. For - example, if a node contains 8 GPUs, it has - 8 ``smdistributed.dataparallel`` processes, then each process will - get a local rank ranging from 0 to 7. - - **Inputs:** - - - ``None`` - - **Returns:** - - - An integer scalar containing the rank of the GPU and - its ``smdistributed.dataparallel`` process. - - -.. function:: smdistributed.dataparallel.tensorflow.allreduce(tensor, param_index, num_params, compression=Compression.none, op=ReduceOp.AVERAGE) - :noindex: - - Performs an all-reduce operation on a tensor (``tf.Tensor``). - - ``smdistributed.dataparallel`` AllReduce API can be used for all - reducing gradient tensors or any other tensors. By - default, ``smdistributed.dataparallel`` AllReduce averages the - tensors across the participating workers. - ​ - **Inputs:** - - - ``tensor (tf.Tensor)(required)``: The tensor to be all-reduced. The shape of the input must be identical across all ranks. - - ``param_index (int)(required):`` 0 if you are reducing a single tensor. Index of the tensor if you are reducing a list of tensors. - - ``num_params (int)(required):`` len(tensor). - - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. - - *  Supported compression types - ``none``, ``fp16`` - - - ``op (optional)(smdistributed.dataparallel.tensorflow.ReduceOp)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. - - * Supported ops: ``SUM``, ``MIN``, ``MAX``, ``AVERAGE`` - - **Returns:** - - - A tensor of the same shape and type as input ``tensor``, all-reduced across all the processes. - - -.. function:: smdistributed.dataparallel.tensorflow.broadcast_global_variables(root_rank) - :noindex: - - Broadcasts all global variables from root rank to all other processes. - - **Inputs:** - - - ``root_rank (int)(required):`` Rank of the process from which global - variables will be broadcasted to all other processes. - - **Returns:** - - - ``None`` - - -.. function:: smdistributed.dataparallel.tensorflow.broadcast_variables(variables, root_rank) - :noindex: - - Applicable for TensorFlow 2.x only. - ​ - Broadcasts variables from root rank to all other processes. - ​ - With TensorFlow 2.x, ``broadcast_variables`` is used to - broadcast ``model.variables`` and ``optimizer.variables`` post - initialization from the leader node to all the worker nodes. This - ensures a consistent initialization across all the worker ranks. - - **Inputs:** - - - ``variables (tf.Variable)(required):`` Variables to be broadcasted. - - ``root_rank (int)(required):`` Rank of the process from which - variables will be broadcasted to all other processes. - - **Returns:** - - - ``None`` - - -.. function:: smdistributed.dataparallel.tensorflow.oob_allreduce(tensor, compression=Compression.none, op=ReduceOp.AVERAGE) - :noindex: - - OutOfBand (oob) AllReduce is simplified AllReduce function for use cases - such as calculating total loss across all the GPUs in the training. - oob_allreduce average the tensors, as reduction operation, across the - worker nodes. - - **Inputs:** - - - ``tensor (tf.Tensor)(required)``: The tensor to be all-reduced. The shape of the input must be identical across all worker nodes. - - ``compression`` (optional): Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. - - *  Supported compression types - ``none``, ``fp16`` - - - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different worker nodes. Defaults to ``Average`` if None is given. - - * Supported ops: ``AVERAGE`` - - **Returns:** - - - ``None`` - - .. rubric:: Notes - - ``smdistributed.dataparallel.tensorflow.oob_allreduce``, in most - cases, is ~2x slower - than ``smdistributed.dataparallel.tensorflow.allreduce``  so it is not - recommended to be used for performing gradient reduction during the - training - process. ``smdistributed.dataparallel.tensorflow.oob_allreduce`` internally - uses NCCL AllReduce with ``ncclSum`` as the reduction operation. - - -.. function:: smdistributed.dataparallel.tensorflow.overlap(tensor) - :noindex: - - This function is applicable only for models compiled with XLA. Use this - function to enable ``smdistributed.dataparallel`` to efficiently - overlap backward pass with the all reduce operation. - - Example usage: - - .. code:: python - - layer = tf.nn.dropout(...) # Or any other layer - layer = smdistributed.dataparallel.tensorflow.overlap(layer) - - The overlap operation is inserted into the TF graph as a node. It - behaves as an identity operation, and helps in achieving the - communication overlap with backward pass operation. - - **Inputs:** - - - ``tensor (tf.Tensor)(required):`` The tensor to be all-reduced. - - **Returns:** - - - ``None`` - - .. rubric:: Notes - - This operation helps in speeding up distributed training, as - the AllReduce operation does not have to wait for all the gradients to - be ready. Backward propagation proceeds sequentially from the output - layer of the network to the input layer. When the gradient computation - for a layer finishes, ``smdistributed.dataparallel`` adds them to a - fusion buffer. As soon as the size of the fusion buffer reaches a - predefined threshold (25 Mb), ``smdistributed.dataparallel`` starts - the AllReduce operation. - - -.. function:: smdistributed.dataparallel.tensorflow.broadcast(tensor, root_rank) - :noindex: - - Broadcasts the input tensor on root rank to the same input tensor on all - other ``smdistributed.dataparallel`` processes. - ​ - The broadcast will not start until all processes are ready to send and - receive the tensor. - - **Inputs:** - - - ``tensor (tf.Tensor)(required):`` The tensor to be broadcasted. - - ``root_rank (int)(required):`` Rank of the process from which - tensor will be broadcasted to all other processes. - - **Returns:** - - - A tensor of the same shape and type as tensor, with the value - broadcasted from root rank. - - -.. function:: smdistributed.dataparallel.tensorflow.shutdown() - :noindex: - - Shuts down ``smdistributed.dataparallel``. Optional to call at the end - of the training script. - - **Inputs:** - - - ``None`` - - **Returns:** - - - ``None`` - - -.. function:: smdistributed.dataparallel.tensorflow.DistributedOptimizer - :noindex: - - Applicable if you use the ``tf.estimator`` API in TensorFlow 2.x (2.3.1). - ​ - Construct a new ``DistributedOptimizer`` , which uses TensorFlow - optimizer under the hood for computing single-process gradient values - and applying gradient updates after the gradient values have been - combined across all ``smdistributed.dataparallel`` workers. - ​ - Example usage: - - .. code:: python - - opt = ... # existing optimizer from tf.train package or your custom optimizer - opt = smdistributed.dataparallel.tensorflow.DistributedOptimizer(opt) - - - - ``optimizer (tf.train.Optimizer)(required):`` TF Optimizer to use for computing gradients and applying updates. - - - ``name (str)(optional):`` Name prefix for the operations created when applying gradients. Defaults to ``smdistributed.dataparallel`` followed by provided optimizer type. - - - ``use_locking (bool)(optional):`` Whether to use locking when updating variables. Defaults to ``False``. - - - ``device_dense:`` Not supported. Raises not supported error. - - - ``device_sparse:`` Not supported. Raises not supported error. - - - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. - - *  Supported compression types - ``none``, ``fp16`` - - - ``sparse_as_dense:`` Not supported. Raises not supported error. - - - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. - - * Supported ops: ``AVERAGE`` - - - ``bucket_cap_mb (int)(optional):`` Size of ``smdistributed.dataparallel`` fusion buffer size. Defaults to 25MB that works optimally for most case. If you provide a value, expects the (value * 1024 * 1024) i.e., bytes to be multiple of 128. - - -.. function:: smdistributed.dataparallel.tensorflow.DistributedGradientTape - :noindex: - - Applicable to TensorFlow 2.x only. - - Construct a new ``DistributedGradientTape``, which uses - TensorFlow’s ``GradientTape`` under the hood, using an AllReduce to - combine gradient values before applying gradients to model weights. - ​ - Example Usage: - - .. code:: python - - with tf.GradientTape() as tape: -       output = model(input) -       loss_value = loss(label, output) - - # Wrap in smdistributed.dataparallel's DistributedGradientTape - tape = smdistributed.dataparallel.tensorflow.DistributedGradientTape(tape) - - - - ``gradtape (tf.GradientTape)(required):`` GradientTape to use for computing gradients and applying updates. - - - ``device_dense:`` Not supported. Raises not supported error. - - - ``device_sparse:`` Not supported. Raises not supported error. - - - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. - - *  Supported compression types - ``none``, ``fp16`` - - - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. - - * Supported ops: ``AVERAGE`` - - -.. function:: smdistributed.dataparallel.tensorflow.BroadcastGlobalVariablesHook - :noindex: - - Applicable if you use the ``tf.estimator`` API in TensorFlow 2.x (2.3.1). - - - ``SessionRunHook`` that will broadcast all global variables from root - rank to all other processes during initialization. - ​ - This is necessary to ensure consistent initialization of all workers - when training is started with random weights or restored from a - checkpoint. - ​ - Example Usage: - - .. code:: python - - hooks = [smdistributed.dataparallel.tensorflow.BroadcastGlobalVariablesHook(root_rank=0)] - ... - with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, -                                        hooks=hooks, -                                        config=config) as mon_sess: -      ... - - - - ``root_rank (int)(required):`` Rank of the process from which global - variables will be broadcasted to all other processes. - - -.. function:: smdistributed.dataparallel.tensorflow.Compression - :noindex: - - Optional Gradient Compression algorithm that can be used in AllReduce - operation. - - - ``none``: alias for ``NoneCompression``. Do not compression gradient - tensors. - - ``fp16``: alias for ``FP16Compression``. Compress the floating point - gradient tensors to 16-bit (FP16) - - -.. function:: smdistributed.dataparallel.tensorflow.ReduceOp - :noindex: - - Supported reduction operations in ``smdistributed.dataparallel``. - - - ``AVERAGE`` - - ``SUM`` - - ``MIN`` - - ``MAX`` diff --git a/doc/api/training/sdp_versions/v1_1_x.rst b/doc/api/training/sdp_versions/v1_1_x.rst deleted file mode 100644 index 90c3a984a7..0000000000 --- a/doc/api/training/sdp_versions/v1_1_x.rst +++ /dev/null @@ -1,9 +0,0 @@ - -Version 1.1.x -============= - -.. toctree:: - :maxdepth: 1 - - v1.1.x/smd_data_parallel_pytorch.rst - v1.1.x/smd_data_parallel_tensorflow.rst diff --git a/doc/api/training/smd_data_parallel.rst b/doc/api/training/smd_data_parallel.rst index 8c76cd2f46..5092395585 100644 --- a/doc/api/training/smd_data_parallel.rst +++ b/doc/api/training/smd_data_parallel.rst @@ -85,7 +85,6 @@ Select a version to see the API documentation for version. :maxdepth: 1 sdp_versions/latest.rst - sdp_versions/v1_1_x.rst sdp_versions/v1_0_0.rst .. important:: diff --git a/doc/api/training/smd_data_parallel_release_notes/smd_data_parallel_change_log.md b/doc/api/training/smd_data_parallel_release_notes/smd_data_parallel_change_log.md index ecb67e1c26..a36f100337 100644 --- a/doc/api/training/smd_data_parallel_release_notes/smd_data_parallel_change_log.md +++ b/doc/api/training/smd_data_parallel_release_notes/smd_data_parallel_change_log.md @@ -1,16 +1,3 @@ -# Sagemaker Distributed Data Parallel 1.2.0 Release Notes - -* New features -* Bug Fixes - -*New features:* - -* Support of [EFA network interface](https://aws.amazon.com/hpc/efa/) for distributed AllReduce. For best performance, it is recommended you use an instance type that supports Amazon Elastic Fabric Adapter (ml.p3dn.24xlarge and ml.p4d.24xlarge) when you train a model using SageMaker distributed data parallel. - -*Bug Fixes:* - -* Improved performance on single node and small clusters. - # Sagemaker Distributed Data Parallel 1.1.2 Release Notes * Bug Fixes