Distributed training with TensorFlow

doc link

Overview

tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.


Types of strategies

tf.distribute.Strategy intends to cover a number of use cases along different axes. Some of these combinations are currently supported and others will be added in the future. Some of these axes are:

  • Synchronous vs asynchronous training: These are two common ways of distributing training with data parallelism. In sync training, all workers train over different slices of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Typically sync training is supported via all-reduce and async through parameter server architecture.

  • Hardware platform: You may want to scale your training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.

Training API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy
Keras API Supported Experimental support Experimental support
Custom training loop Experimental support Experimental support Support planned post 2.0
Estimator API Limited Support Not supported Limited Support
CentralStorageStrategy ParameterServerStrategy OneDeviceStrategy
Experimental support Supported planned post 2.0 Supported
Support planned post 2.0 No support yet
Limited Support Limited Support Limited Support

MirroredStrategy

tf.distribute.MirroredStrategy supports synchronous distributed training on multiple GPUs on one machine. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. Together, these variables form a single conceptual variable called MirroredVariable. These variables are kept in sync with each other by applying identical updates.

Efficient all-reduce algorithms are used to communicate the variable updates across the devices.

All-reduce aggregates tensors across all the devices by adding them up, and makes them available on each device. It’s a fused algorithm that is very efficient and can reduce the overhead of synchronization significantly.

There are many all-reduce algorithms and implementations available, depending on the type of communication available between devices. By default, it uses NVIDIA NCCL as the all-reduce implementation. You can choose from a few other options we provide, or write your own.

Here is the simplest way of creating MirroredStrategy:

1
mirrored_strategy = tf.distribute.MirroredStrategy()

This will create a MirroredStrategy instance which will use all the GPUs that are visible to TensorFlow, and use NCCL as the cross device communication.

If you wish to use only some of the GPUs on your machine, you can do so like this:

1
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

Currently, tf.distribute.HierarchicalCopyAllReduce and tf.distribute.ReductionToOneDevice are two options other than tf.distribute.NcclAllReduce which is the default.

1
2
mirrored_strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy does synchronous training as well. Variables are not mirrored, instead they are placed on the CPU and operations are replicated across all local GPUs. If there is only one GPU, all variables and operations will be placed on that GPU.

Create an instance of CentralStorageStrategy by:

1
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy is very similar to MirroredStrategy. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to MirroredStrategy, it creates copies of all variables in the model on each device across all workers.

It uses CollectiveOps as the multi-worker all-reduce communication method used to keep variables in sync.

A collective op is a single op in the TensorFlow graph which can automatically choose an all-reduce algorithm in the TensorFlow runtime according to hardware, network topology and tensor sizes.

It also implements additional performance optimizations. For example, it includes a static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors.

How?

MultiWorkerMirroredStrategy currently allows you to choose between two different implementations of collective ops. CollectiveCommunication.RING implements ring-based collectives using gRPC as the communication layer. CollectiveCommunication.NCCL uses Nvidia’s NCCL to implement collectives.

1
2
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
tf.distribute.experimental.CollectiveCommunication.NCCL)

TPUStrategy

tf.distribute.experimental.TPUStrategy lets you run your TensorFlow training on Tensor Processing Units (TPUs).

1
2
3
4
5
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy supports parameter servers training on multiple machines. In this setup, some machines are designated as workers and some as parameter servers. Each variable of the model is placed on one parameter server. Computation is replicated across all GPUs of all the workers.

OneDeviceStrategy

tf.distribute.OneDeviceStrategy runs on a single device. This strategy will place any variables created in its scope on the specified device. Input distributed through this strategy will be prefetched to the specified device. Moreover, any functions called via strategy.run will also be placed on the specified device.

You can use this strategy to test your code before switching to other strategies which actually distributes to multiple devices/machines.

1
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Using tf.distribute.Strategy with Keras

By integrating into tf.keras backend, we’ve made it seamless for you to distribute your training written in the Keras training framework.

Here’s what you need to change in your code:

  • Create an instance of the appropriate tf.distribute.Strategy
  • Move the creation and compiling of Keras model inside strategy.scope.

We support all types of Keras models - sequential, functional and subclassed.

Here is a snippet of code to do this for a very simple Keras model with one dense layer:

1
2
3
4
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')

In this example we used MirroredStrategy so we can run this on a machine with multiple GPUs.

strategy.scope() indicated which parts of the code to run distributed. Creating a model inside this scope allows us to create mirrored variables instead of regular variables. Compiling under the scope allows us to know that the user intends to train this model using this strategy. Once this is set up, you can fit your model like you would normally.

MirroredStrategy takes care of replicating the model’s training on the available GPUs, aggregating gradients, and more.

1
2
3
4
5
6
7
8
9
# tf.data.Dataset as dataset
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)

# np array as dataset
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)

In both cases (dataset or numpy), each batch of the given input is divided equally among the multiple replicas. For instance, if using MirroredStrategy with 2 GPUs, each batch of size 10 will get divided among the 2 GPUs, with each receiving 5 input examples in each step.

Each epoch will then train faster as you add more GPUs. Typically, you would want to increase your batch size as you add more accelerators so as to make effective use of the extra computing power. You will also need to re-tune your learning rate, depending on the model.

How to merge these replicated models?

You can use strategy.num_replicas_in_sync to get the number of replicas.

1
2
3
4
5
6
7
8
9
# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Using tf.distribute.Strategy with custom training loops

If you need more flexibility and control over your training loops than is possible with Estimator or Keras, you can write custom training loops.

For instance, when using a GAN, you may want to take a different number of generator or discriminator steps each round. Similarly, the high level frameworks are not very suitable for Reinforcement Learning training.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(dist_inputs):
def step_fn(inputs):
features, labels = inputs

with tf.GradientTape() as tape:
# training=True is only needed if there are layers with different
# behavior during training versus inference (e.g. Dropout).
logits = model(features, training=True)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
logits=logits, labels=labels)
# scaled the total loss by the global batch size
loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)

grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
return cross_entropy

per_example_losses = mirrored_strategy.run(step_fn, args=(dist_inputs,))
mean_loss = mirrored_strategy.reduce(
tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0)
return mean_loss

with mirrored_strategy.scope():
for inputs in dist_dataset:
print(train_step(inputs))

Using tf.distribute.Strategy with Estimator (Limited support)

tf.estimator is a distributed training TensorFlow API that originally supported the async parameter server approach.

We pass the strategy object into the RunConfig for the Estimator.

1
2
3
4
5
6
7
8
9
10
11
12
13
mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
feature_columns=[tf.feature_column.numeric_column('feats')],
optimizer='SGD',
config=config)

def input_fn():
dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)

Another difference to highlight here between Estimator and Keras is the input handling. In Estimator, we do not do automatic splitting of batch, nor automatically shard the data across different workers. You have full control over how you want your data to be distributed across workers and devices, and you must provide an input_fn to specify how to distribute your data.

Your input_fn is called once per worker, thus giving one dataset per worker. Then one batch from that dataset is fed to one replica on that worker, thereby consuming N batches for N replicas on 1 worker. In other words, the dataset returned by the input_fn should provide batches of size PER_REPLICA_BATCH_SIZE. And the global batch size for a step can be obtained as PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync.

Use a GPU

This guide is for users who have tried above approaches and found that they need fine-grained control of how TensorFlow uses the GPU.

1
2
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Logging device placement

To find out which devices your operations and tensors are assigned to, put tf.debugging.set_log_device_placement(True) as the first statement of your program. Enabling device placement logging causes any Tensor allocations or operations to be printed.

1
2
3
4
5
6
7
8
tf.debugging.set_log_device_placement(True)

# Create some tensors
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
c = tf.matmul(a, b)

print(c)

Manual device placement

If you would like a particular operation to run on a device of your choice instead of what’s automatically selected for you, you can use with tf.device to create a device context, and all the operations within that context will run on the same designated device.

1
2
3
4
5
6
7
8
9
tf.debugging.set_log_device_placement(True)

# Place tensors on the CPU
with tf.device('/CPU:0'):
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

c = tf.matmul(a, b)
print(c)

Limiting GPU memory growth

By default, TensorFlow maps nearly all of the GPU memory of all GPUs (subject to CUDA_VISIBLE_DEVICES) visible to the process. This is done to more efficiently use the relatively precious GPU memory resources on the devices by reducing memory fragmentation. To limit TensorFlow to a specific set of GPUs we use the tf.config.experimental.set_visible_devices method.

1
2
3
4
5
6
7
8
9
10
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
# Restrict TensorFlow to only use the first GPU
try:
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
except RuntimeError as e:
# Visible devices must be set before GPUs have been initialized
print(e)
1
2
3
4
5
6
7
8
9
10
11
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
# Currently, memory growth needs to be the same across GPUs
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Memory growth must be set before GPUs have been initialized
print(e)

The second method is to configure a virtual GPU device with tf.config.experimental.set_virtual_device_configuration and set a hard limit on the total memory to allocate on the GPU.

1
2
3
4
5
6
7
8
9
10
11
12
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
# Restrict TensorFlow to only allocate 1GB of memory on the first GPU
try:
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Virtual devices must be set before GPUs have been initialized
print(e)

Using a single GPU on a multi-GPU system

If you have more than one GPU in your system, the GPU with the lowest ID will be selected by default. If you would like to run on a different GPU, you will need to specify the preference explicitly:

1
2
3
4
5
6
7
8
9
10
tf.debugging.set_log_device_placement(True)

try:
# Specify an invalid GPU device
with tf.device('/device:GPU:2'):
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
c = tf.matmul(a, b)
except RuntimeError as e:
print(e)

If you would like TensorFlow to automatically choose an existing and supported device to run the operations in case the specified one doesn’t exist, you can call tf.config.set_soft_device_placement(True).

Using multiple GPUs

Developing for multiple GPUs will allow a model to scale with the additional resources. If developing on a system with a single GPU, we can simulate multiple GPUs with virtual devices. This enables easy testing of multi-GPU setups without requiring additional resources.

1
2
3
4
5
6
7
8
9
10
11
12
13
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
# Create 2 virtual GPUs with 1GB memory each
try:
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPU,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Virtual devices must be set before GPUs have been initialized
print(e)

The best practice for using multiple GPUs is to use tf.distribute.Strategy. Here is a simple example:

1
2
3
4
5
6
7
8
9
tf.debugging.set_log_device_placement(True)

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
inputs = tf.keras.layers.Input(shape=(1,))
predictions = tf.keras.layers.Dense(1)(inputs)
model = tf.keras.models.Model(inputs=inputs, outputs=predictions)
model.compile(loss='mse',
optimizer=tf.keras.optimizers.SGD(learning_rate=0.2))

Error info: NCCL cannot support logical gpus