[Keras] TensorFlow distributed training

When we have a large number of computing resources, we can make full use of these computing resources by using appropriate distributed strategies, so as to greatly reduce the time of model training. For different usage scenarios, TensorFlow is in TF distribute. Strategy provides us with several distributed strategies, which enable us to train the model more efficiently.

1, Single machine multi card training: mirrored strategy

tf. distribute. Mirrored strategy is a simple and high-performance synchronous distributed strategy with parallel data. It mainly supports the training of multiple GPU s on the same host. When using this strategy, we only need to instantiate a MirroredStrategy policy:

strategy = tf.distribute.MirroredStrategy()

And put the code of model construction into strategy In the context of scope():

with strategy.scope():
    # Model building code

You can specify devices in parameters, such as:

strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

That is, specify that only GPU s 0 and 1 are used to participate in the distributed policy.

The following code shows the process of using Keras to train MobileNetV2 on some image datasets in TensorFlow Datasets using the mirrored strategy strategy:

import tensorflow as tf
import tensorflow_datasets as tfds

num_epochs = 5
batch_size_per_replica = 64
learning_rate = 0.001

strategy = tf.distribute.MirroredStrategy()
print('Number of devices: %d' % strategy.num_replicas_in_sync)  # Number of output devices
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync

# Load dataset and preprocess
def resize(image, label):
    image = tf.image.resize(image, [224, 224]) / 255.0
    return image, label

# Use TensorFlow Datasets to load cat and dog classification datasets. See Chapter "TensorFlow Datasets loading" for details
dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(resize).shuffle(1024).batch(batch_size)

with strategy.scope():
    model = tf.keras.applications.MobileNetV2(weights=None, classes=2)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )

model.fit(dataset, epochs=num_epochs)

In the following tests, we use four NVIDIA GeForce GTX 1080 Ti graphics cards on the same host for single machine multi card model training. The epoch number of all tests was 5. When using a single machine without distributed configuration, although the machine still has four graphics cards, the program does not use distributed settings, but directly carries out training, and the Batch Size is set to 64. When using a single machine with four cards, the total Batch Size is 64 (the Batch Size distributed to a single machine is 16) and the total Batch Size is 256 (the Batch Size distributed to a single machine is 64).

data setSingle machine without distributed (Batch Size 64)Single machine four cards (total Batch Size is 64)Single machine four cards (total Batch Size is 256)
cats_vs_dogs146s/epoch39s/epoch29s/epoch
tf_flowers22s/epoch7s/epoch5s/epoch

It can be seen that the speed of model training has been greatly improved after using mirrored strategy. When the performance of all graphics cards is similar, the training time is close to the inverse relationship with the number of graphics cards.

The steps of MirroredStrategy are as follows:

  1. Before the training, the strategy copies a complete model on all N computing devices;
  2. When a batch of data is transferred into each training, the data is divided into N parts and transferred to n computing devices respectively (i.e. data parallel);
  3. N computing devices use local variables (mirror variables) to calculate the gradient of some data obtained by themselves;
  4. Using the all reduce operation of distributed computing, the gradient data is efficiently exchanged and summed between computing devices, so that each device finally has the gradient sum of all devices;
  5. Update the local variable (mirror variable) with the result of gradient summation;
  6. When all devices update local variables, the next round of training is carried out (that is, the parallel strategy is synchronous).

By default, the MirroredStrategy policy in TensorFlow uses NVIDIA NCCL for all reduce operations.

2, Multi machine training: multi worker mirrored strategy

The method of multi machine training is similar to that of single machine and multi card. Replace the mirrored strategy with the multi worker mirrored strategy suitable for multi machine training. However, because it involves the communication between multiple computers, some additional settings need to be made. Specifically, the environment variable TF needs to be set_ Config, for example:

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:20000", "localhost:20001"]
    },
    'task': {'type': 'worker', 'index': 0}
})

TF_CONFIG consists of cluster and task:

  • Cluster describes the structure of the whole multi machine cluster and the network address (IP + port number) of each machine. For each machine, the value of cluster is the same;
  • Task describes the role of the current machine. For example, {type ':' worker ',' index ': 0} indicates that the current machine is the 0th worker in the cluster (i.e. localhost:20000). The task value of each machine needs to be set separately for the current host.

After the above content is set, run the training code one by one on all machines. The first running code will enter the listening state before connecting with other hosts. After the connection of the whole cluster is established, all machines will start training at the same time.

Please pay attention to the setting of firewall on each machine, especially the port that needs to be opened to communicate with other hosts. As in the above example, worker 0 needs to open port 20000 and worker 1 needs to open port 20001.

The training tasks of the following examples are the same as those in the previous section, except that they are migrated to a multi machine training environment. Suppose we have two machines. First, deploy the following programs on both machines. The only difference is the task part. The first machine is set to {type ':' worker ',' index ': 0}, and the second machine is set to {type': 'worker', 'index': 1}. Next, run the program on the two machines in turn. After the communication is successful, the training process will start automatically.

import tensorflow as tf
import tensorflow_datasets as tfds
import os
import json

num_epochs = 5
batch_size_per_replica = 64
learning_rate = 0.001

num_workers = 2
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:20000", "localhost:20001"]
    },
    'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
batch_size = batch_size_per_replica * num_workers

def resize(image, label):
    image = tf.image.resize(image, [224, 224]) / 255.0
    return image, label

dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(resize).shuffle(1024).batch(batch_size)

with strategy.scope():
    model = tf.keras.applications.MobileNetV2(weights=None, classes=2)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )

model.fit(dataset, epochs=num_epochs)

In the following tests, we build two virtual machine instances with a single NVIDIA Tesla K80 on Google Cloud Platform, and test the training duration when using one GPU and the training duration when using two virtual machine instances for distributed training. The epoch number of all tests was 5. When using a single card, the Batch Size is set to 64. When using dual machine single card, test the total Batch Size of 64 (the Batch Size distributed to a single machine is 32) and the total Batch Size of 128 (the Batch Size distributed to a single machine is 64).

data setSingle machine and single card (Batch Size is 64)Dual machine single card (total Batch Size is 64)Dual computer and single card (total Batch Size is 128)
cats_vs_dogs1622s858s755s
tf_flowers301s152s144s

It can be seen that the speed of model training has also been greatly improved. When the performance of all machines is close, the training time is inversely proportional to the number of machines.

reference material

TensorFlow distributed training

Tags: Distribution TensorFlow keras

Posted by Mercenary on Thu, 05 May 2022 15:33:08 +0300