import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tffrom tensorflow import keras
print(tf.__version__)
2.0.0-beta1
# 必须beta1版本,alpha0不行
# tf.debugging.set_log_device_placement(True):把模型的变量分布在哪个GPU上给打印出来
tf.debugging.set_log_device_placement(True)
# 获取所有的物理GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
# 设置GPU可见,由于只有1个GPU,因此选择gpus[0],一般一个物理GPU对应一个逻辑GPU
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
# 将物理GPU划分逻辑分区,这里将gpus[0]分为1个逻辑GPU,内存分别是2048,程序运行时占用内存 < 2048
tf.config.experimental.set_virtual_device_configuration(gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
print(len(gpus))
# 获取所有的逻辑GPU
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(logical_gpus))
1 2
fashion_mnist = keras.datasets.fashion_mnist
(x_train_all, y_train_all), (x_test, y_test) = fashion_mnist.load_data()
x_valid, x_train = x_train_all[:5000], x_train_all[5000:]
y_valid, y_train = y_train_all[:5000], y_train_all[5000:]print(x_valid.shape, y_valid.shape)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
(5000, 28, 28) (5000,) (55000, 28, 28) (55000,) (10000, 28, 28) (10000,)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_valid_scaled = scaler.transform(x_valid.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
x_test_scaled = scaler.transform(x_test.astype(np.float32).reshape(-1, 1)).reshape(-1, 28, 28, 1)
def make_dataset(images, labels, epochs, batch_size, shuffle=True):
dataset = tf.data.Dataset.from_tensor_slices((images, labels))
if shuffle:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat(epochs).batch(batch_size).prefetch(50)
return datasetstrategy = tf.distribute.MirroredStrategy()
with strategy.scope():
batch_size_per_replica = 128
batch_size = batch_size_per_replica * len(logical_gpus)
# 在后面的for循环中反复调用dataset,因此设置epochs为1
train_dataset = make_dataset(x_train_scaled, y_train, 1, batch_size)
valid_dataset = make_dataset(x_valid_scaled, y_valid, 1, batch_size)
# 数据并行,将dataset中batch分为n份给GPU来计算梯度,最后再聚合
train_dataset_distribute = strategy.experimental_distribute_dataset(train_dataset)
valid_dataset_distribute = strategy.experimental_distribute_dataset(valid_dataset)
Executing op TensorSliceDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op ShuffleDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op RepeatDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op BatchDatasetV2 in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op PrefetchDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op ExperimentalRebatchDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op ExperimentalAutoShardDataset in device /job:localhost/replica:0/task:0/device:CPU:0
with strategy.scope():
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(filters=32, kernel_size=3,
padding='same',
activation='relu',
input_shape=(28, 28, 1)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=3,
padding='same',
activation='relu'))
model.add(keras.layers.MaxPool2D(pool_size=2))
model.add(keras.layers.Conv2D(filters=64, kernel_size=3,
padding='same',
activation='relu'))
model.add(keras.layers.Conv2D(filters=64, kernel_size=3,
padding='same',
activation='relu'))
model.add(keras.layers.MaxPool2D(pool_size=2))
model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
padding='same',
activation='relu'))
model.add(keras.layers.Conv2D(filters=128, kernel_size=3,
padding='same',
activation='relu'))
model.add(keras.layers.MaxPool2D(pool_size=2))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dense(10, activation="softmax"))
Executing op RandomUniform in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op Sub in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op Mul in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op Add in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op LogicalNot in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op Assert in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarIsInitializedOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op LogicalNot in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op Assert in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op Fill in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1
model.summary()
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 28, 28, 32) 320 _________________________________________________________________ conv2d_1 (Conv2D) (None, 28, 28, 32) 9248 _________________________________________________________________ max_pooling2d (MaxPooling2D) (None, 14, 14, 32) 0 _________________________________________________________________ conv2d_2 (Conv2D) (None, 14, 14, 64) 18496 _________________________________________________________________ conv2d_3 (Conv2D) (None, 14, 14, 64) 36928 _________________________________________________________________ max_pooling2d_1 (MaxPooling2 (None, 7, 7, 64) 0 _________________________________________________________________ conv2d_4 (Conv2D) (None, 7, 7, 128) 73856 _________________________________________________________________ conv2d_5 (Conv2D) (None, 7, 7, 128) 147584 _________________________________________________________________ max_pooling2d_2 (MaxPooling2 (None, 3, 3, 128) 0 _________________________________________________________________ flatten (Flatten) (None, 1152) 0 _________________________________________________________________ dense (Dense) (None, 128) 147584 _________________________________________________________________ dense_1 (Dense) (None, 10) 1290 ================================================================= Total params: 435,306 Trainable params: 435,306 Non-trainable params: 0 _________________________________________________________________
# customized training loop.
# 1. define losses functions
# 2. define function train_step
# 3. define function test_step
# 4. for-loop training loopwith strategy.scope():
""
batch_size, batch_size / #{gpu}
eg: 64, gpu: 16
reduction:表示loss的聚合方式,若使用SUM_OVER_BATCH_SIZE则会在16个样本上求loss的均值,但实际上应该在64个样本上求loss均值
GPU0:loss = 20,GPU1:loss = 30,GPU2:loss = 25,GPU0:loss = 26 -> (20 + 30 + 25 + 26) / 64 -> 20/64 + 30/64 + 25/64 + 26/64""
loss_func = keras.losses.SparseCategoricalCrossentropy(reduction = keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_replica_loss = loss_func(labels, predictions)
return tf.nn.compute_average_loss(per_replica_loss, global_batch_size = batch_size) # 计算损失值/64
test_loss = keras.metrics.Mean(name = "test_loss")
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name = 'train_accuracy')
test_accuracy = keras.metrics.SparseCategoricalAccuracy(name = 'test_accuracy')optimizer = keras.optimizers.SGD(lr=0.01)
# 使用distributed_train_step封装,因此不需要再用tf.function
# train_step在每一个GPU上运行,其输入是16个样本,因此计算的也是16个样本的loss值
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training = True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
@tf.function
def distributed_train_step(inputs):
# 根据GPU数对batch进行均分,每个GPU使用batch/len(gpu)个样本独自运行train_step
per_replica_average_loss = strategy.experimental_run_v2(train_step, args = (inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_average_loss, axis = None) # 将不同GPU上的loss进行聚合
def test_step(inputs):
images, labels = inputs
predictions = model(images)
t_loss = loss_func(labels, predictions)
# 因为test_loss求的是均值,因此不要使用compute_average_loss来除64
# # GPU0:loss = 20,GPU1:loss = 30,GPU2:loss = 25,GPU0:loss = 26 -> (20/16 + 30/16 + 25/16 + 26/16)/4
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
@tf.function
def distributed_test_step(inputs):
strategy.experimental_run_v2(test_step, args = (inputs,))epochs = 10
for epoch in range(epochs):
total_loss = 0.0
num_batches = 0
for x in train_dataset:
start_time = time.time()
total_loss += distributed_train_step(x)
run_time = time.time() - start_time
num_batches += 1
print('\rtotal: %3.3f, num_batches: %d, average: %3.3f, time: %3.3f'
% (total_loss, num_batches,total_loss / num_batches, run_time), end = '')
train_loss = total_loss / num_batches
for x in valid_dataset:
distributed_test_step(x)print('\rEpoch: %d, Loss: %3.3f, Acc: %3.3f, Val_Loss: %3.3f, Val_Acc: %3.3f'
% (epoch + 1, train_loss, train_accuracy.result(), test_loss.result(), test_accuracy.result()))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op AddN in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op OptimizeDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op ModelDataset in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op AnonymousIteratorV2 in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op MakeIterator in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op IteratorGetNextSync in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op ReadVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:1 Executing op __inference_initialize_variables_1232 in device <unspecified> Executing op __inference_distributed_train_step_1661 in device <unspecified> Executing op Add in device /job:localhost/replica:0/task:0/device:GPU:0 Executing op RealDiv in device /job:localhost/replica:0/task:0/device:GPU:0 total: 599.001, num_batches: 214, average: 2.799, time: 0.093Executing op __inference_distributed_train_step_3803 in device <unspecified> total: 600.244, num_batches: 215, average: 2.792, time: 5.398Executing op DeleteIterator in device /job:localhost/replica:0/task:0/device:CPU:0 Executing op __inference_distributed_test_step_4015 in device <unspecified> Executing op __inference_distributed_test_step_4249 in device <unspecified> Executing op DivNoNan in device /job:localhost/replica:0/task:0/device:GPU:0 Epoch: 1, Loss: 2.792, Acc: 0.522, Val_Loss: 0.765, Val_Acc: 0.725 Epoch: 2, Loss: 1.412, Acc: 0.742, Val_Loss: 0.590, Val_Acc: 0.785 Epoch: 3, Loss: 1.138, Acc: 0.791, Val_Loss: 0.482, Val_Acc: 0.832 Epoch: 4, Loss: 0.973, Acc: 0.822, Val_Loss: 0.420, Val_Acc: 0.853 Epoch: 5, Loss: 0.868, Acc: 0.841, Val_Loss: 0.385, Val_Acc: 0.863 Epoch: 6, Loss: 0.798, Acc: 0.853, Val_Loss: 0.364, Val_Acc: 0.868 Epoch: 7, Loss: 0.750, Acc: 0.862, Val_Loss: 0.349, Val_Acc: 0.871 Epoch: 8, Loss: 0.712, Acc: 0.870, Val_Loss: 0.337, Val_Acc: 0.877 Epoch: 9, Loss: 0.682, Acc: 0.875, Val_Loss: 0.328, Val_Acc: 0.879 Epoch: 10, Loss: 0.657, Acc: 0.880, Val_Loss: 0.320, Val_Acc: 0.880