import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
import datetime
from matplotlib import pyplot as plt
import io
# datasets用于数据集的管理,layers用于dense层,optimizer优化器,Sequential容器,metrics测试用的度量器
# 对数据集作预处理(对每一个x,y样本)
def preprocess(x, y):
x = tf.cast(x, dtype=tf.float32) / 255.
y = tf.cast(y, dtype=tf.int32)
return x, y
def plot_to_image(figure):
# 把图片存储为png形式
buf = io.BytesIO()
plt.savefig(buf, format='png')
# 关掉图片,防止它直接在notebook里display
plt.close(figure)
buf.seek(0)
# 将图片从PNG转化为TF image
image = tf.image.decode_png(buf.getvalue(), channels=4)
image = tf.expand_dims(image, 0)
return image
def image_grid(images):
figure = plt.figure(figsize=(10, 10))
for i in range(25):
plt.subplot(5, 5, i + 1, title='name')
plt.xticks([])
plt.yticks([])
plt.grid(False)
plt.imshow(images[i], cmap=plt.cm.binary)
return figure
batchsz = 128
(x, y), (x_val, y_val) = datasets.mnist.load_data()
db = tf.data.Dataset.from_tensor_slices((x, y))
db = db.map(preprocess).shuffle(60000).batch(batchsz).repeat(10)
db_val = tf.data.Dataset.from_tensor_slices((x_val, y_val))
db_val = db_val.map(preprocess).batch(batchsz, drop_remainder=True) # test部分不需要shuffle
# 构建一个5层的网络
model = Sequential([
layers.Dense(256, activation=tf.nn.relu), # [b,784]---->[b,256]
layers.Dense(128, activation=tf.nn.relu), # [b,256]---->[b,128]
layers.Dense(64, activation=tf.nn.relu), # [b,128]---->[b,64]
layers.Dense(32, activation=tf.nn.relu), # [b,64]---->[b,32]
layers.Dense(10) # [b,32]---->[b,10] 最后一层不用激活函数 330=32*10+10
])
model.build(input_shape=(None, 28 * 28)) # 完成一个创建工作
model.summary() # 起到调试作用,打印网络结构
# 优化器更新过程 w=w-lr*dw 只需要传入一个list即可对所有的w进行更新
optimizer = optimizers.Adam(lr=0.01) # Adam只是一种参数最优化的方法,其他的还有SGD、Momentum
# 创建一个summary,可以往中间喂数据
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = 'logs/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir)
# [128,28,28]
sample_img = next(iter(db))[0]
# [28,28]
sample_img = sample_img[0]
sample_img = tf.reshape(sample_img, [1, 28, 28, 1])
#喂了一张图片
with summary_writer.as_default():
tf.summary.image("Training sample:", sample_img, step=0)
for step, (x, y) in enumerate(db):
with tf.GradientTape() as tape:
x = tf.reshape(x, (-1, 28 * 28))
out = model(x)
y_onehot = tf.one_hot(y, depth=10)
loss = tf.reduce_mean(tf.losses.categorical_crossentropy(y_onehot, out, from_logits=True))
grads = tape.gradient(loss, model.trainable_variables) # 求所有导数
optimizer.apply_gradients(zip(grads, model.trainable_variables)) # 参数原地更新
if step % 100 == 0:
print(step, 'loss:', float(loss))
with summary_writer.as_default():
tf.summary.scalar('train-loss', float(loss), step=step)
if step % 500 == 0:
total_correct = 0.
total_num = 0.
for _,(x, y) in enumerate(db_val):
x = tf.reshape(x, (-1, 28 * 28))
# test的时候不需要求梯度
out = model(x)
# logits=>prob
pred = tf.argmax(out, axis=1) # 得到最大值的索引 int64
pred = tf.cast(pred,dtype=tf.int32)
correct = tf.reduce_sum(tf.cast(tf.equal(pred, y), dtype=tf.int32)).numpy()
total_correct += correct # correct类型是tensor,total_correct类型是numpy
total_num += x.shape[0] # 加上batch的个数
acc = total_correct / total_num
print(step, 'test acc:', acc)
val_images = x[:25]
val_images = tf.reshape(val_images,[-1,28,28,1])
#喂accuracy
with summary_writer.as_default():
tf.summary.scalar('test-acc',float(acc),step=step)
#喂很多图片,但是以离散的方式呈现
tf.summary.image("val-onebyond-images:",val_images,max_outputs=25,step=step)
#喂很多图片,但是经过人为的拼接了
val_images = tf.reshape(val_images,[-1,28,28])
figure = image_grid(val_images)
tf.summary.image('val-images:',plot_to_image(figure),step=step)