diff --git a/doc/api/training/sdp_versions/latest.rst b/doc/api/training/sdp_versions/latest.rst index 25a3605d07..ec8c3d7adf 100644 --- a/doc/api/training/sdp_versions/latest.rst +++ b/doc/api/training/sdp_versions/latest.rst @@ -1,5 +1,5 @@ -Version 1.1.2 (Latest) +Version 1.2.0 (Latest) ====================== .. toctree:: diff --git a/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst b/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst index 44d33c5f31..52de6223d7 100644 --- a/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst +++ b/doc/api/training/sdp_versions/latest/smd_data_parallel_pytorch.rst @@ -155,7 +155,7 @@ PyTorch API .. rubric:: Supported versions -**PyTorch 1.7.1, 1.8.0** +**PyTorch 1.7.1, 1.8.1** .. function:: smdistributed.dataparallel.torch.distributed.is_available() diff --git a/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst b/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst index e4aa1c1521..e5ea3f2106 100644 --- a/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst +++ b/doc/api/training/sdp_versions/latest/smd_data_parallel_tensorflow.rst @@ -157,10 +157,7 @@ TensorFlow API .. rubric:: Supported versions -TensorFlow is supported in version 1.0.0 of ``sagemakerdistributed.dataparallel``. -Reference version 1.0.0 `TensorFlow API documentation -`_ -for supported TensorFlow versions. +**TensorFlow 2.4.1** .. function:: smdistributed.dataparallel.tensorflow.init() diff --git a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst new file mode 100644 index 0000000000..b8216f6a72 --- /dev/null +++ b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_pytorch.rst @@ -0,0 +1,533 @@ +############################################################## +PyTorch Guide to SageMaker's distributed data parallel library +############################################################## + +.. admonition:: Contents + + - :ref:`pytorch-sdp-modify` + - :ref:`pytorch-sdp-api` + +.. _pytorch-sdp-modify: + :noindex: + +Modify a PyTorch training script to use SageMaker data parallel +====================================================================== + +The following steps show you how to convert a PyTorch training script to +utilize SageMaker's distributed data parallel library. + +The distributed data parallel library APIs are designed to be close to PyTorch Distributed Data +Parallel (DDP) APIs. +See `SageMaker distributed data parallel PyTorch examples `__ for additional details on how to implement the data parallel library +API offered for PyTorch. + + +- First import the distributed data parallel library’s PyTorch client and initialize it. You also import + the distributed data parallel library module for distributed training. + + .. code:: python + + import smdistributed.dataparallel.torch.distributed as dist + + from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP + + dist.init_process_group() + + +- Pin each GPU to a single distributed data parallel library process with ``local_rank`` - this + refers to the relative rank of the process within a given node. + ``smdistributed.dataparallel.torch.get_local_rank()`` API provides + you the local rank of the device. The leader node will be rank 0, and + the worker nodes will be rank 1, 2, 3, and so on. This is invoked in + the next code block as ``dist.get_local_rank()``. + + .. code:: python + + torch.cuda.set_device(dist.get_local_rank()) + + +- Then wrap the PyTorch model with the distributed data parallel library’s DDP. + + .. code:: python + + model = ... + # Wrap model with SageMaker's DistributedDataParallel + model = DDP(model) + + +- Modify the ``torch.utils.data.distributed.DistributedSampler`` to + include the cluster’s information. Set``num_replicas`` to the + total number of GPUs participating in training across all the nodes + in the cluster. This is called ``world_size``. You can get + ``world_size`` with + ``smdistributed.dataparallel.torch.get_world_size()`` API. This is + invoked in the following code as ``dist.get_world_size()``. Also + supply the node rank using + ``smdistributed.dataparallel.torch.get_rank()``. This is invoked as + ``dist.get_rank()``. + + .. code:: python + + train_sampler = DistributedSampler(train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank()) + + +- Finally, modify your script to save checkpoints only on the leader + node. The leader node will have a synchronized model. This also + avoids worker nodes overwriting the checkpoints and possibly + corrupting the checkpoints. + +.. code:: python + + if dist.get_rank() == 0: + torch.save(...) + + +All put together, the following is an example PyTorch training script +you will have for distributed training with the distributed data parallel library: + +.. code:: python + + # Import distributed data parallel library PyTorch API + import smdistributed.dataparallel.torch.distributed as dist + + # Import distributed data parallel library PyTorch DDP + from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP + + # Initialize distributed data parallel library + dist.init_process_group() + + class Net(nn.Module): +     ... +     # Define model + + def train(...): +     ... +     # Model training + + def test(...): +     ... +     # Model evaluation + + def main(): + +     # Scale batch size by world size +     batch_size //= dist.get_world_size() // 8 +     batch_size = max(batch_size, 1) + +     # Prepare dataset +     train_dataset = torchvision.datasets.MNIST(...) + +     # Set num_replicas and rank in DistributedSampler +     train_sampler = torch.utils.data.distributed.DistributedSampler( +             train_dataset, +             num_replicas=dist.get_world_size(), +             rank=dist.get_rank()) + +     train_loader = torch.utils.data.DataLoader(..) + +     # Wrap the PyTorch model with distributed data parallel library’s DDP +     model = DDP(Net().to(device)) + +     # Pin each GPU to a single distributed data parallel library process. +     torch.cuda.set_device(local_rank) +     model.cuda(local_rank) + +     # Train +     optimizer = optim.Adadelta(...) +     scheduler = StepLR(...) +     for epoch in range(1, args.epochs + 1): +         train(...) +         if rank == 0: +             test(...) +         scheduler.step() + +     # Save model on master node. +     if dist.get_rank() == 0: +         torch.save(...) + + if __name__ == '__main__': +     main() + + +.. _pytorch-sdp-api-1.1.x: + :noindex: + +PyTorch API +=========== + +.. rubric:: Supported versions + +**PyTorch 1.7.1, 1.8.0** + + +.. function:: smdistributed.dataparallel.torch.distributed.is_available() + :noindex: + + Check if script started as a distributed job. For local runs user can + check that is_available returns False and run the training script + without calls to ``smdistributed.dataparallel``. + + **Inputs:** + + - ``None`` + + **Returns:** + + - ``True`` if started as a distributed job, ``False`` otherwise + + +.. function:: smdistributed.dataparallel.torch.distributed.init_process_group(*args, **kwargs) + :noindex: + + Initialize ``smdistributed.dataparallel``. Must be called at the + beginning of the training script, before calling any other methods. + ​ + Process group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with ``torch.distributed`` only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + ​ + After this + call, ``smdistributed.dataparallel.torch.distributed.is_initialized()`` will + return ``True``. + ​ + + **Inputs:** + + - ``None`` + + **Returns:** + + - ``None`` + + +.. function:: smdistributed.dataparallel.torch.distributed.is_initialized() + :noindex: + + Checks if the default process group has been initialized. + + **Inputs:** + + - ``None`` + + **Returns:** + + - ``True`` if initialized, else ``False``. + + +.. function:: smdistributed.dataparallel.torch.distributed.get_world_size(group=smdistributed.dataparallel.torch.distributed.group.WORLD) + :noindex: + + The total number of GPUs across all the nodes in the cluster. For + example, in a 8 node cluster with 8 GPU each, size will be equal to 64. + + **Inputs:** + + - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + + **Returns:** + + - An integer scalar containing the total number of GPUs in the training + job, across all nodes in the cluster. + + +.. function:: smdistributed.dataparallel.torch.distributed.get_rank(group=smdistributed.dataparallel.torch.distributed.group.WORLD) + :noindex: + + The rank of the node in the cluster. The rank ranges from 0 to number of + nodes - 1. This is similar to MPI's World Rank. + + + **Inputs:** + + - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + + **Returns:** + + - An integer scalar containing the rank of the worker node. + + +.. function:: smdistributed.dataparallel.torch.distributed.get_local_rank() + :noindex: + + Local rank refers to the relative rank of + the ``smdistributed.dataparallel`` process within the node the current + process is running on. For example, if a node contains 8 GPUs, it has + 8 ``smdistributed.dataparallel`` processes. Each process has + a ``local_rank`` ranging from 0 to 7. + + **Inputs:** + + - ``None`` + + **Returns:** + + - An integer scalar containing the rank of the GPU and + its ``smdistributed.dataparallel`` process. + + +.. function:: smdistributed.dataparallel.torch.distributed.all_reduce(tensor, op=smdistributed.dataparallel.torch.distributed.ReduceOp.SUM, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) + :noindex: + + Performs an all-reduce operation on a tensor (torch.tensor) across + all ``smdistributed.dataparallel`` workers + + ``smdistributed.dataparallel`` AllReduce API can be used for all + reducing gradient tensors or any other tensors.  By + default, ``smdistributed.dataparallel`` AllReduce reduces the tensor + data across all ``smdistributed.dataparallel`` workers in such a way + that all get the final result. + + After the call ``tensor`` is going to be bitwise identical in all + processes. + + **Inputs:** + + - ``tensor (torch.tensor) (required):`` Input and output of the collective. The function operates in-place. + + - ``op (smdistributed.dataparallel.torch.distributed.ReduceOp) (optional)``: The reduction operation to combine tensors across different ranks.  Defaults to ``SUM`` if None is given. + + * Supported ops: ``AVERAGE``, ``SUM``, ``MIN``, ``MAX`` + + - ``group (smdistributed.dataparallel.torch.distributed.group) (optional):`` Process group is not supported in ``smdistributed.dataparallel``. This parameter exists for API parity with torch.distributed only. + + * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + + - ``async_op (bool) (optional):`` Whether this op should be an async op. Defaults to ``False``. + + **Returns:** + + - Async op work handle, if async_op is set to True. ``None``, + otherwise. + + .. rubric:: Notes + + ``smdistributed.dataparallel.torch.distributed.allreduce``, in most + cases, is ~2X slower than all-reducing + with ``smdistributed.dataparallel.torch.parallel.distributed.DistributedDataParallel`` and + hence, it is not recommended to be used for performing gradient + reduction during the training + process. ``smdistributed.dataparallel.torch.distributed.allreduce`` internally + uses NCCL AllReduce with ``ncclSum`` as the reduction operation. + + +.. function:: smdistributed.dataparallel.torch.distributed.broadcast(tensor, src=0, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) + :noindex: + + Broadcasts the tensor (torch.tensor) to the whole group. + + ``tensor`` must have the same number of elements as GPUs in the + cluster. + + **Inputs:** + + - ``tensor (torch.tensor)(required)`` + + - ``src (int)(optional)`` + + - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process group is not supported in ``smdistributed.dataparallel``. This parameter exists for API parity with ``torch.distributed`` only. + + * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + + - ``async_op (bool)(optional):`` Whether this op should be an async op. Defaults to ``False``. + + **Returns:** + + - Async op work handle, if async_op is set to True. ``None``, otherwise. + + +.. function:: smdistributed.dataparallel.torch.distributed.all_gather(tensor_list, tensor, group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) + :noindex: + + Gathers tensors from the whole group in a list. + + + **Inputs:** + + - ``tensor_list (list[torch.tensor])(required):`` Output list. It + should contain correctly-sized tensors to be used for output of the + collective. + - ``tensor (torch.tensor)(required):`` Tensor to be broadcast from + current process. + - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + - ``async_op (bool)(optional):`` Whether this op should be an async op. + Defaults to ``False``. + + **Returns:** + + - Async op work handle, if async_op is set to True. ``None``, + otherwise. + + +.. function:: smdistributed.dataparallel.torch.distributed.all_to_all_single(output_t, input_t, output_split_sizes=None, input_split_sizes=None, group=group.WORLD, async_op=False) + :noindex: + + Each process scatters input tensor to all processes in a group and return gathered tensor in output. + + **Inputs:** + + - output_t + - input_t + - output_split_sizes + - input_split_sizes + - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + - ``async_op (bool)(optional):`` Whether this op should be an async op. + Defaults to ``False``. + + **Returns:** + + - Async op work handle, if async_op is set to True. ``None``, + otherwise. + + +.. function:: smdistributed.dataparallel.torch.distributed.barrier(group=smdistributed.dataparallel.torch.distributed.group.WORLD, async_op=False) + :noindex: + + Synchronizes all ``smdistributed.dataparallel`` processes. + + **Inputs:** + + - tensor (torch.tensor)(required): Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise. + + - src (int)(optional): Source rank. + + - ``group (smdistributed.dataparallel.torch.distributed.group)(optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. + + * Only supported value is ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` + + - ``async_op (bool)(optional):`` Whether this op should be an async op. + Defaults to ``False``. + + **Returns:** + + - Async op work handle, if async_op is set to True. ``None``, + otherwise. + + +.. class:: smdistributed.dataparallel.torch.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, broadcast_buffers=True, process_group=None, bucket_cap_mb=None) + :noindex: + + ``smdistributed.dataparallel's`` implementation of distributed data + parallelism for PyTorch. In most cases, wrapping your PyTorch Module + with ``smdistributed.dataparallel's`` ``DistributedDataParallel (DDP)`` is + all you need to do to use ``smdistributed.dataparallel``. + + Creation of this DDP class requires ``smdistributed.dataparallel`` + already initialized + with ``smdistributed.dataparallel.torch.distributed.init_process_group()``. + + This container parallelizes the application of the given module by + splitting the input across the specified devices by chunking in the + batch dimension. The module is replicated on each machine and each + device, and each such replica handles a portion of the input. During the + backwards pass, gradients from each node are averaged. + + The batch size should be larger than the number of GPUs used locally. + ​ + Example usage + of ``smdistributed.dataparallel.torch.parallel.DistributedDataParallel``: + + .. code:: python + + import torch + import smdistributed.dataparallel.torch.distributed as dist + from smdistributed.dataparallel.torch.parallel import DistributedDataParallel as DDP + + dist.init_process_group() + + # Pin GPU to be used to process local rank (one GPU per process) + torch.cuda.set_device(dist.get_local_rank()) + + # Build model and optimizer + model = ... + optimizer = torch.optim.SGD(model.parameters(), +                             lr=1e-3 * dist.get_world_size()) + # Wrap model with smdistributed.dataparallel's DistributedDataParallel + model = DDP(model) + + **Parameters:** + + - ``module (torch.nn.Module)(required):`` PyTorch NN Module to be + parallelized + - ``device_ids (list[int])(optional):`` CUDA devices. This should only + be provided when the input module resides on a single CUDA device. + For single-device modules, + the ``ith module replica is placed on device_ids[i]``. For + multi-device modules and CPU modules, device_ids must be None or an + empty list, and input data for the forward pass must be placed on the + correct device. Defaults to ``None``. + - ``output_device (int)(optional):`` Device location of output for + single-device CUDA modules. For multi-device modules and CPU modules, + it must be None, and the module itself dictates the output location. + (default: device_ids[0] for single-device modules).  Defaults + to ``None``. + - ``broadcast_buffers (bool)(optional):`` Flag that enables syncing + (broadcasting) buffers of the module at beginning of the forward + function. ``smdistributed.dataparallel`` does not support broadcast + buffer yet. Please set this to ``False``. + - ``process_group(smdistributed.dataparallel.torch.distributed.group)(optional):`` Process + group is not supported in ``smdistributed.dataparallel``. This + parameter exists for API parity with torch.distributed only. Only + supported value is + ``smdistributed.dataparallel.torch.distributed.group.WORLD.`` Defaults + to ``None.`` + - ``bucket_cap_mb (int)(optional):`` DistributedDataParallel will + bucket parameters into multiple buckets so that gradient reduction of + each bucket can potentially overlap with backward + computation. ``bucket_cap_mb`` controls the bucket size in + MegaBytes (MB) (default: 25). + + .. rubric:: Notes + + - This module assumes all parameters are registered in the model by the + time it is created. No parameters should be added nor removed later. + - This module assumes all parameters are registered in the model of + each distributed processes are in the same order. The module itself + will conduct gradient all-reduction following the reverse order of + the registered parameters of the model. In other words, it is users’ + responsibility to ensure that each distributed process has the exact + same model and thus the exact same parameter registration order. + - You should never change the set of your model’s parameters after + wrapping up your model with DistributedDataParallel. In other words, + when wrapping up your model with DistributedDataParallel, the + constructor of DistributedDataParallel will register the additional + gradient reduction functions on all the parameters of the model + itself at the time of construction. If you change the model’s + parameters after the DistributedDataParallel construction, this is + not supported and unexpected behaviors can happen, since some + parameters’ gradient reduction functions might not get called. + + +.. class:: smdistributed.dataparallel.torch.distributed.ReduceOp + :noindex: + + An enum-like class for supported reduction operations + in ``smdistributed.dataparallel``. + + The values of this class can be accessed as attributes, for + example, ``ReduceOp.SUM``. They are used in specifying strategies for + reduction collectives such as +  ``smdistributed.dataparallel.torch.distributed.all_reduce(...)``. + + - ``AVERAGE`` + - ``SUM`` + - ``MIN`` + - ``MAX`` diff --git a/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst new file mode 100644 index 0000000000..d8b6fdf960 --- /dev/null +++ b/doc/api/training/sdp_versions/v1.1.x/smd_data_parallel_tensorflow.rst @@ -0,0 +1,558 @@ +################################################################# +TensorFlow Guide to SageMaker's distributed data parallel library +################################################################# + +.. admonition:: Contents + + - :ref:`tensorflow-sdp-modify` + - :ref:`tensorflow-sdp-api` + +.. _tensorflow-sdp-modify: + :noindex: + +Modify a TensorFlow 2.x training script to use SageMaker data parallel +====================================================================== + +The following steps show you how to convert a TensorFlow 2.x training +script to utilize the distributed data parallel library. + +The distributed data parallel library APIs are designed to be close to Horovod APIs. +See `SageMaker distributed data parallel TensorFlow examples +`__ +for additional details on how to implement the data parallel library. + +- First import the distributed data parallel library’s TensorFlow client and initialize it: + + .. code:: python + + import smdistributed.dataparallel.tensorflow as sdp + sdp.init() + + +- Pin each GPU to a single smdistributed.dataparallel process + with ``local_rank`` - this refers to the relative rank of the + process within a given node. ``sdp.tensorflow.local_rank()`` API + provides you the local rank of the device. The leader node will be + rank 0, and the worker nodes will be rank 1, 2, 3, and so on. This is + invoked in the next code block as ``sdp.local_rank()``. + ``set_memory_growth`` is not directly related to SMD, but must be set + for distributed training with TensorFlow. + + .. code:: python + + gpus = tf.config.experimental.list_physical_devices('GPU') + for gpu in gpus: +     tf.config.experimental.set_memory_growth(gpu, True) + if gpus: +     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') + + +- Scale the learning rate by the number of workers. + ``sdp.tensorflow.size()`` API provides you number of workers in the + cluster. This is invoked in the next code block as ``sdp.size()``. + + .. code:: python + + learning_rate = learning_rate * sdp.size() + + +- Use the library’s ``DistributedGradientTape`` to optimize AllReduce + operations during training. This wraps ``tf.GradientTape``. + + .. code:: python + + with tf.GradientTape() as tape: +       output = model(input) +       loss_value = loss(label, output) + + # Wrap tf.GradientTape with the library's DistributedGradientTape + tape = sdp.DistributedGradientTape(tape) + + +- Broadcast initial model variables from the leader node (rank 0) to + all the worker nodes (ranks 1 through n). This is needed to ensure a + consistent initialization across all the worker ranks. For this, you + use ``sdp.tensorflow.broadcast_variables`` API after the + model and optimizer variables are initialized. This is invoked in the + next code block as ``sdp.broadcast_variables()``. + + .. code:: python + + sdp.broadcast_variables(model.variables, root_rank=0) + sdp.broadcast_variables(opt.variables(), root_rank=0) + + +- Finally, modify your script to save checkpoints only on the leader + node. The leader node will have a synchronized model. This also + avoids worker nodes overwriting the checkpoints and possibly + corrupting the checkpoints. + + .. code:: python + + if sdp.rank() == 0: +     checkpoint.save(checkpoint_dir) + + +All put together, the following is an example TensorFlow2 training +script you will have for distributed training with the library. + +.. code:: python + + import tensorflow as tf + + # Import the library's TF API + import smdistributed.dataparallel.tensorflow as sdp + + # Initialize the library + sdp.init() + + gpus = tf.config.experimental.list_physical_devices('GPU') + for gpu in gpus: +     tf.config.experimental.set_memory_growth(gpu, True) + if gpus: +     # Pin GPUs to a single process +     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') + + # Prepare Dataset + dataset = tf.data.Dataset.from_tensor_slices(...) + + # Define Model + mnist_model = tf.keras.Sequential(...) + loss = tf.losses.SparseCategoricalCrossentropy() + + # Scale Learning Rate + # LR for 8 node run : 0.000125 + # LR for single node run : 0.001 + opt = tf.optimizers.Adam(0.000125 * sdp.size()) + + @tf.function + def training_step(images, labels, first_batch): +     with tf.GradientTape() as tape: +         probs = mnist_model(images, training=True) +         loss_value = loss(labels, probs) + +     # Wrap tf.GradientTape with the library's DistributedGradientTape +     tape = sdp.DistributedGradientTape(tape) + +     grads = tape.gradient(loss_value, mnist_model.trainable_variables) +     opt.apply_gradients(zip(grads, mnist_model.trainable_variables)) + +     if first_batch: +        # Broadcast model and optimizer variables +        sdp.broadcast_variables(mnist_model.variables, root_rank=0) +        sdp.broadcast_variables(opt.variables(), root_rank=0) + +     return loss_value + + ... + + # Save checkpoints only from master node. + if sdp.rank() == 0: +     checkpoint.save(checkpoint_dir) + + +.. _tensorflow-sdp-api: + :noindex: + +TensorFlow API +============== + +.. rubric:: Supported versions + +TensorFlow is supported in version 1.0.0 of ``sagemakerdistributed.dataparallel``. +Reference version 1.0.0 `TensorFlow API documentation +`_ +for supported TensorFlow versions. + +.. function:: smdistributed.dataparallel.tensorflow.init() + :noindex: + + Initialize ``smdistributed.dataparallel``. Must be called at the + beginning of the training script. + + + **Inputs:** + + - ``None`` + + **Returns:** + + - ``None`` + + + .. rubric:: Notes + + ``init()`` needs to be called only once. It will throw an error if + called more than once: + + ``init() called more than once. smdistributed.dataparallel is already initialized.`` + + +.. function:: smdistributed.dataparallel.tensorflow.size() + :noindex: + + The total number of GPUs across all the nodes in the cluster. For + example, in a 8 node cluster with 8 GPUs each, ``size`` will be equal + to 64. + + + **Inputs:** + + - ``None`` + + **Returns:** + + - An integer scalar containing the total number of GPUs, across all + nodes in the cluster. + + +.. function:: smdistributed.dataparallel.tensorflow.local_size() + :noindex: + + The total number of GPUs on a node. For example, on a node with 8 + GPUs, ``local_size`` will be equal to 8. + + **Inputs:** + + - ``None`` + + **Returns:** + + - An integer scalar containing the total number of GPUs on itself. + + +.. function:: smdistributed.dataparallel.tensorflow.rank() + :noindex: + + The rank of the node in the cluster. The rank ranges from 0 to number of + nodes - 1. This is similar to MPI's World Rank. + + **Inputs:** + + - ``None`` + + **Returns:** + + - An integer scalar containing the rank of the node. + + +.. function:: smdistributed.dataparallel.tensorflow.local_rank() + :noindex: + + Local rank refers to the relative rank of the + GPUs’ ``smdistributed.dataparallel`` processes within the node. For + example, if a node contains 8 GPUs, it has + 8 ``smdistributed.dataparallel`` processes, then each process will + get a local rank ranging from 0 to 7. + + **Inputs:** + + - ``None`` + + **Returns:** + + - An integer scalar containing the rank of the GPU and + its ``smdistributed.dataparallel`` process. + + +.. function:: smdistributed.dataparallel.tensorflow.allreduce(tensor, param_index, num_params, compression=Compression.none, op=ReduceOp.AVERAGE) + :noindex: + + Performs an all-reduce operation on a tensor (``tf.Tensor``). + + ``smdistributed.dataparallel`` AllReduce API can be used for all + reducing gradient tensors or any other tensors. By + default, ``smdistributed.dataparallel`` AllReduce averages the + tensors across the participating workers. + ​ + **Inputs:** + + - ``tensor (tf.Tensor)(required)``: The tensor to be all-reduced. The shape of the input must be identical across all ranks. + - ``param_index (int)(required):`` 0 if you are reducing a single tensor. Index of the tensor if you are reducing a list of tensors. + - ``num_params (int)(required):`` len(tensor). + - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. + + *  Supported compression types - ``none``, ``fp16`` + + - ``op (optional)(smdistributed.dataparallel.tensorflow.ReduceOp)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. + + * Supported ops: ``SUM``, ``MIN``, ``MAX``, ``AVERAGE`` + + **Returns:** + + - A tensor of the same shape and type as input ``tensor``, all-reduced across all the processes. + + +.. function:: smdistributed.dataparallel.tensorflow.broadcast_global_variables(root_rank) + :noindex: + + Broadcasts all global variables from root rank to all other processes. + + **Inputs:** + + - ``root_rank (int)(required):`` Rank of the process from which global + variables will be broadcasted to all other processes. + + **Returns:** + + - ``None`` + + +.. function:: smdistributed.dataparallel.tensorflow.broadcast_variables(variables, root_rank) + :noindex: + + Applicable for TensorFlow 2.x only. + ​ + Broadcasts variables from root rank to all other processes. + ​ + With TensorFlow 2.x, ``broadcast_variables`` is used to + broadcast ``model.variables`` and ``optimizer.variables`` post + initialization from the leader node to all the worker nodes. This + ensures a consistent initialization across all the worker ranks. + + **Inputs:** + + - ``variables (tf.Variable)(required):`` Variables to be broadcasted. + - ``root_rank (int)(required):`` Rank of the process from which + variables will be broadcasted to all other processes. + + **Returns:** + + - ``None`` + + +.. function:: smdistributed.dataparallel.tensorflow.oob_allreduce(tensor, compression=Compression.none, op=ReduceOp.AVERAGE) + :noindex: + + OutOfBand (oob) AllReduce is simplified AllReduce function for use cases + such as calculating total loss across all the GPUs in the training. + oob_allreduce average the tensors, as reduction operation, across the + worker nodes. + + **Inputs:** + + - ``tensor (tf.Tensor)(required)``: The tensor to be all-reduced. The shape of the input must be identical across all worker nodes. + - ``compression`` (optional): Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. + + *  Supported compression types - ``none``, ``fp16`` + + - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different worker nodes. Defaults to ``Average`` if None is given. + + * Supported ops: ``AVERAGE`` + + **Returns:** + + - ``None`` + + .. rubric:: Notes + + ``smdistributed.dataparallel.tensorflow.oob_allreduce``, in most + cases, is ~2x slower + than ``smdistributed.dataparallel.tensorflow.allreduce``  so it is not + recommended to be used for performing gradient reduction during the + training + process. ``smdistributed.dataparallel.tensorflow.oob_allreduce`` internally + uses NCCL AllReduce with ``ncclSum`` as the reduction operation. + + +.. function:: smdistributed.dataparallel.tensorflow.overlap(tensor) + :noindex: + + This function is applicable only for models compiled with XLA. Use this + function to enable ``smdistributed.dataparallel`` to efficiently + overlap backward pass with the all reduce operation. + + Example usage: + + .. code:: python + + layer = tf.nn.dropout(...) # Or any other layer + layer = smdistributed.dataparallel.tensorflow.overlap(layer) + + The overlap operation is inserted into the TF graph as a node. It + behaves as an identity operation, and helps in achieving the + communication overlap with backward pass operation. + + **Inputs:** + + - ``tensor (tf.Tensor)(required):`` The tensor to be all-reduced. + + **Returns:** + + - ``None`` + + .. rubric:: Notes + + This operation helps in speeding up distributed training, as + the AllReduce operation does not have to wait for all the gradients to + be ready. Backward propagation proceeds sequentially from the output + layer of the network to the input layer. When the gradient computation + for a layer finishes, ``smdistributed.dataparallel`` adds them to a + fusion buffer. As soon as the size of the fusion buffer reaches a + predefined threshold (25 Mb), ``smdistributed.dataparallel`` starts + the AllReduce operation. + + +.. function:: smdistributed.dataparallel.tensorflow.broadcast(tensor, root_rank) + :noindex: + + Broadcasts the input tensor on root rank to the same input tensor on all + other ``smdistributed.dataparallel`` processes. + ​ + The broadcast will not start until all processes are ready to send and + receive the tensor. + + **Inputs:** + + - ``tensor (tf.Tensor)(required):`` The tensor to be broadcasted. + - ``root_rank (int)(required):`` Rank of the process from which + tensor will be broadcasted to all other processes. + + **Returns:** + + - A tensor of the same shape and type as tensor, with the value + broadcasted from root rank. + + +.. function:: smdistributed.dataparallel.tensorflow.shutdown() + :noindex: + + Shuts down ``smdistributed.dataparallel``. Optional to call at the end + of the training script. + + **Inputs:** + + - ``None`` + + **Returns:** + + - ``None`` + + +.. function:: smdistributed.dataparallel.tensorflow.DistributedOptimizer + :noindex: + + Applicable if you use the ``tf.estimator`` API in TensorFlow 2.x (2.3.1). + ​ + Construct a new ``DistributedOptimizer`` , which uses TensorFlow + optimizer under the hood for computing single-process gradient values + and applying gradient updates after the gradient values have been + combined across all ``smdistributed.dataparallel`` workers. + ​ + Example usage: + + .. code:: python + + opt = ... # existing optimizer from tf.train package or your custom optimizer + opt = smdistributed.dataparallel.tensorflow.DistributedOptimizer(opt) + + + - ``optimizer (tf.train.Optimizer)(required):`` TF Optimizer to use for computing gradients and applying updates. + + - ``name (str)(optional):`` Name prefix for the operations created when applying gradients. Defaults to ``smdistributed.dataparallel`` followed by provided optimizer type. + + - ``use_locking (bool)(optional):`` Whether to use locking when updating variables. Defaults to ``False``. + + - ``device_dense:`` Not supported. Raises not supported error. + + - ``device_sparse:`` Not supported. Raises not supported error. + + - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. + + *  Supported compression types - ``none``, ``fp16`` + + - ``sparse_as_dense:`` Not supported. Raises not supported error. + + - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. + + * Supported ops: ``AVERAGE`` + + - ``bucket_cap_mb (int)(optional):`` Size of ``smdistributed.dataparallel`` fusion buffer size. Defaults to 25MB that works optimally for most case. If you provide a value, expects the (value * 1024 * 1024) i.e., bytes to be multiple of 128. + + +.. function:: smdistributed.dataparallel.tensorflow.DistributedGradientTape + :noindex: + + Applicable to TensorFlow 2.x only. + + Construct a new ``DistributedGradientTape``, which uses + TensorFlow’s ``GradientTape`` under the hood, using an AllReduce to + combine gradient values before applying gradients to model weights. + ​ + Example Usage: + + .. code:: python + + with tf.GradientTape() as tape: +       output = model(input) +       loss_value = loss(label, output) + + # Wrap in smdistributed.dataparallel's DistributedGradientTape + tape = smdistributed.dataparallel.tensorflow.DistributedGradientTape(tape) + + + - ``gradtape (tf.GradientTape)(required):`` GradientTape to use for computing gradients and applying updates. + + - ``device_dense:`` Not supported. Raises not supported error. + + - ``device_sparse:`` Not supported. Raises not supported error. + + - ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression. + + *  Supported compression types - ``none``, ``fp16`` + + - ``op (smdistributed.dataparallel.tensorflow.ReduceOp)(optional)``: The reduction operation to combine tensors across different ranks. Defaults to ``Average`` if None is given. + + * Supported ops: ``AVERAGE`` + + +.. function:: smdistributed.dataparallel.tensorflow.BroadcastGlobalVariablesHook + :noindex: + + Applicable if you use the ``tf.estimator`` API in TensorFlow 2.x (2.3.1). + + + ``SessionRunHook`` that will broadcast all global variables from root + rank to all other processes during initialization. + ​ + This is necessary to ensure consistent initialization of all workers + when training is started with random weights or restored from a + checkpoint. + ​ + Example Usage: + + .. code:: python + + hooks = [smdistributed.dataparallel.tensorflow.BroadcastGlobalVariablesHook(root_rank=0)] + ... + with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, +                                        hooks=hooks, +                                        config=config) as mon_sess: +      ... + + + - ``root_rank (int)(required):`` Rank of the process from which global + variables will be broadcasted to all other processes. + + +.. function:: smdistributed.dataparallel.tensorflow.Compression + :noindex: + + Optional Gradient Compression algorithm that can be used in AllReduce + operation. + + - ``none``: alias for ``NoneCompression``. Do not compression gradient + tensors. + - ``fp16``: alias for ``FP16Compression``. Compress the floating point + gradient tensors to 16-bit (FP16) + + +.. function:: smdistributed.dataparallel.tensorflow.ReduceOp + :noindex: + + Supported reduction operations in ``smdistributed.dataparallel``. + + - ``AVERAGE`` + - ``SUM`` + - ``MIN`` + - ``MAX`` diff --git a/doc/api/training/sdp_versions/v1_1_x.rst b/doc/api/training/sdp_versions/v1_1_x.rst new file mode 100644 index 0000000000..90c3a984a7 --- /dev/null +++ b/doc/api/training/sdp_versions/v1_1_x.rst @@ -0,0 +1,9 @@ + +Version 1.1.x +============= + +.. toctree:: + :maxdepth: 1 + + v1.1.x/smd_data_parallel_pytorch.rst + v1.1.x/smd_data_parallel_tensorflow.rst diff --git a/doc/api/training/smd_data_parallel.rst b/doc/api/training/smd_data_parallel.rst index 5092395585..8c76cd2f46 100644 --- a/doc/api/training/smd_data_parallel.rst +++ b/doc/api/training/smd_data_parallel.rst @@ -85,6 +85,7 @@ Select a version to see the API documentation for version. :maxdepth: 1 sdp_versions/latest.rst + sdp_versions/v1_1_x.rst sdp_versions/v1_0_0.rst .. important::