深度学习——tensorflow2.0使用多GPU实现分布式自定义训练流程

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)

2.0.0-beta1

 # 必须beta1版本,alpha0不行
# tf.debugging.set_log_device_placement(True):把模型的变量分布在哪个GPU上给打印出来
tf.debugging.set_log_device_placement(True)
# 获取所有的物理GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
# 设置GPU可见,由于只有1个GPU,因此选择gpus[0],一般一个物理GPU对应一个逻辑GPU
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
# 将物理GPU划分逻辑分区,这里将gpus[0]分为1个逻辑GPU,内存分别是2048,程序运行时占用内存 < 2048
tf.config.experimental.set_virtual_device_configuration(gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
print(len(gpus))
# 获取所有的逻辑GPU
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpus))

1
2

fashion_mnist = keras.datasets.fashion_mnist
(x_train_all, y_train_all), (x_test, y_test) = fashion_mnist.load_data()
x_valid, x_train = x_train_all[:5000], x_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]

print(x_valid.shape, y_valid.shape)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(5000, 28, 28) (5000,)
(55000, 28, 28) (55000,)
(10000, 28, 28) (10000,)

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_valid_scaled = scaler.transform(x_valid.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_test_scaled = scaler.transform(x_test.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)

 def make_dataset(images, labels, epochs, batch_size, shuffle=True):
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if shuffle:
        dataset = dataset.shuffle(10000)
    dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
    return dataset

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    batch_size_per_replica = 128
    batch_size = batch_size_per_replica * len(logical_gpus)
    # 在后面的for循环中反复调用dataset,因此设置epochs为1
    train_dataset = make_dataset(x_train_scaled, y_train, 1, batch_size)
    valid_dataset = make_dataset(x_valid_scaled, y_valid, 1, batch_size)
    # 数据并行,将dataset中batch分为n份给GPU来计算梯度,最后再聚合
    train_dataset_distribute = strategy.experimental_distribute_dataset(train_dataset)
    valid_dataset_distribute = strategy.experimental_distribute_dataset(valid_dataset)

Executing op TensorSliceDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ShuffleDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op RepeatDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op BatchDatasetV2 in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op PrefetchDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ExperimentalRebatchDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ExperimentalAutoShardDataset in device /job:localhost/replica:0/task:0/device:CPU:0

 with strategy.scope():
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(filters=32, kernel_size=3,
                                  padding='same',
                                  activation='relu',
                                  input_shape=(28, 28, 1)))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=3,
                                  padding='same',
                                  activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=3,
                                  padding='same',
                                  activation='relu'))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=3,
                                  padding='same',
                                  activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
                                  padding='same',
                                  activation='relu'))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
                                  padding='same',
                                  activation='relu'))
    model.add(keras.layers.MaxPool2D(pool_size=2))
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(128, activation='relu'))
    model.add(keras.layers.Dense(10, activation="softmax"))

Executing op RandomUniform in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Sub in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Mul in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Add in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op LogicalNot in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Assert in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op LogicalNot in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op Assert in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op Fill in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1

 model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 28, 28, 32)        320       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 32)        9248      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 14, 14, 64)        18496     
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 14, 14, 64)        36928     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 7, 7, 128)         147584    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 3, 3, 128)         0         
_________________________________________________________________
flatten (Flatten)            (None, 1152)              0         
_________________________________________________________________
dense (Dense)                (None, 128)               147584    
_________________________________________________________________
dense_1 (Dense)              (None, 10)                1290      
=================================================================
Total params: 435,306
Trainable params: 435,306
Non-trainable params: 0
_________________________________________________________________

# customized training loop.
# 1. define losses functions
# 2. define function train_step
# 3. define function test_step
# 4. for-loop training loop

with strategy.scope():

    "" 

batch_size, batch_size / #{gpu}
eg: 64, gpu: 16
reduction:表示loss的聚合方式,若使用SUM_OVER_BATCH_SIZE则会在16个样本上求loss的均值,但实际上应该在64个样本上求loss均值
GPU0:loss = 20,GPU1:loss = 30,GPU2:loss = 25,GPU0:loss = 26 -> (20 + 30 + 25 + 26) / 64 -> 20/64 + 30/64 + 25/64 + 26/64

""  
    loss_func = keras.losses.SparseCategoricalCrossentropy(reduction = keras.losses.Reduction.NONE)
    def compute_loss(labels, predictions):
        per_replica_loss = loss_func(labels, predictions)
        return tf.nn.compute_average_loss(per_replica_loss, global_batch_size = batch_size) # 计算损失值/64
    
    test_loss = keras.metrics.Mean(name = "test_loss")
    train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = 'train_accuracy')
    test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = 'test_accuracy')

    optimizer = keras.optimizers.SGD(lr=0.01)

    # 使用distributed_train_step封装,因此不需要再用tf.function
    # train_step在每一个GPU上运行,其输入是16个样本,因此计算的也是16个样本的loss值
    def train_step(inputs):
        images, labels = inputs
        with tf.GradientTape() as tape:
            predictions = model(images, training = True)
            loss = compute_loss(labels, predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        train_accuracy.update_state(labels, predictions)
        return loss
    
    @tf.function
    def distributed_train_step(inputs):
        # 根据GPU数对batch进行均分,每个GPU使用batch/len(gpu)个样本独自运行train_step
        per_replica_average_loss = strategy.experimental_run_v2(train_step, args = (inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_average_loss, axis = None) # 将不同GPU上的loss进行聚合
    
    def test_step(inputs):
        images, labels = inputs
        predictions = model(images)
        t_loss = loss_func(labels, predictions)
        # 因为test_loss求的是均值,因此不要使用compute_average_loss来除64
        # # GPU0:loss = 20,GPU1:loss = 30,GPU2:loss = 25,GPU0:loss = 26 -> (20/16 + 30/16 + 25/16 + 26/16)/4
        test_loss.update_state(t_loss)  
        test_accuracy.update_state(labels, predictions)
        
    @tf.function
    def distributed_test_step(inputs):
        strategy.experimental_run_v2(test_step, args = (inputs,))

    epochs = 10
    for epoch in range(epochs):
        total_loss = 0.0
        num_batches = 0
        for x in train_dataset:
            start_time = time.time()
            total_loss += distributed_train_step(x)
            run_time = time.time() - start_time
            num_batches += 1
            print('\rtotal: %3.3f, num_batches: %d, average: %3.3f, time: %3.3f'
                  % (total_loss, num_batches,total_loss / num_batches, run_time), end = '')
        train_loss = total_loss / num_batches
        for x in valid_dataset:
            distributed_test_step(x)

        print('\rEpoch: %d, Loss: %3.3f, Acc: %3.3f, Val_Loss: %3.3f, Val_Acc: %3.3f'
              % (epoch + 1, train_loss, train_accuracy.result(), test_loss.result(), test_accuracy.result()))
        test_loss.reset_states()
        train_accuracy.reset_states()
        test_accuracy.reset_states()

Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op AddN in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op OptimizeDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op ModelDataset in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op AnonymousIteratorV2 in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op MakeIterator in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op IteratorGetNextSync in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
Executing op __inference_initialize_variables_1232 in device <unspecified>
Executing op __inference_distributed_train_step_1661 in device <unspecified>
Executing op Add in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op RealDiv in device /job:localhost/replica:0/task:0/device:GPU:0
total: 599.001, num_batches: 214, average: 2.799, time: 0.093Executing op __inference_distributed_train_step_3803 in device <unspecified>
total: 600.244, num_batches: 215, average: 2.792, time: 5.398Executing op DeleteIterator in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op __inference_distributed_test_step_4015 in device <unspecified>
Executing op __inference_distributed_test_step_4249 in device <unspecified>
Executing op DivNoNan in device /job:localhost/replica:0/task:0/device:GPU:0
Epoch: 1, Loss: 2.792, Acc: 0.522, Val_Loss: 0.765, Val_Acc: 0.725
Epoch: 2, Loss: 1.412, Acc: 0.742, Val_Loss: 0.590, Val_Acc: 0.785
Epoch: 3, Loss: 1.138, Acc: 0.791, Val_Loss: 0.482, Val_Acc: 0.832
Epoch: 4, Loss: 0.973, Acc: 0.822, Val_Loss: 0.420, Val_Acc: 0.853
Epoch: 5, Loss: 0.868, Acc: 0.841, Val_Loss: 0.385, Val_Acc: 0.863
Epoch: 6, Loss: 0.798, Acc: 0.853, Val_Loss: 0.364, Val_Acc: 0.868
Epoch: 7, Loss: 0.750, Acc: 0.862, Val_Loss: 0.349, Val_Acc: 0.871
Epoch: 8, Loss: 0.712, Acc: 0.870, Val_Loss: 0.337, Val_Acc: 0.877
Epoch: 9, Loss: 0.682, Acc: 0.875, Val_Loss: 0.328, Val_Acc: 0.879
Epoch: 10, Loss: 0.657, Acc: 0.880, Val_Loss: 0.320, Val_Acc: 0.880

 

 

 

 

上一篇:es 容错机制


下一篇:docker安装redis