import os
os.environ[‘TF_CPP_MIN_LOG_LEVEL‘] = ‘2‘
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
# datasets用于数据集的管理,layers用于dense层,optimizer优化器,Sequential容器,metrics测试用的度量器
(x, y), (x_test, y_test) = datasets.fashion_mnist.load_data() # 导入数据
print(x.shape, y.shape) # (60k 28 28)
db = tf.data.Dataset.from_tensor_slices((x, y)) # 构造数据集,这一步已经转化为tensor了,因此下面不需要再convert_to_tensor
# 对数据集作预处理(对每一个x,y样本)
def preprocess(x, y):
x = tf.cast(x, dtype=tf.float32) / 255.
y = tf.cast(y, dtype=tf.int32)
y = tf.one_hot(y, depth=10)
return x, y
batchsz = 128
db = db.map(preprocess).shuffle(10000).batch(batchsz)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(batchsz) # test部分不需要shuffle
db_iter = iter(db)
sample = next(db_iter)
print(‘batch:‘, sample[0].shape, sample[1].shape) # 打印看看一个batch的shape,sample【0】是x,sample【1】是y
# (128,28,28) (128,10)
# 构建一个5层的网络
model = Sequential([
layers.Dense(256, activation=tf.nn.relu), # [b,784]---->[b,256]
layers.Dense(128, activation=tf.nn.relu), # [b,256]---->[b,128]
layers.Dense(64, activation=tf.nn.relu), # [b,128]---->[b,64]
layers.Dense(32, activation=tf.nn.relu), # [b,64]---->[b,32]
layers.Dense(10) # [b,32]---->[b,10] 最后一层不用激活函数 330=32*10+10
])
model.build(input_shape=(None, 28 * 28)) # 完成一个创建工作
model.summary() # 起到调试作用,打印网络结构
#优化器更新过程 w=w-lr*dw 只需要传入一个list即可对所有的w进行更新
optimizer = optimizers.Adam(lr=1e-3) #Adam只是一种参数最优化的方法,其他的还有SGD、Momentum
def main():
# 前向传播
for epoch in range(30):
# 对每一个batch
for step, (x, y) in enumerate(db):
# x:[128,28,28]y:[128,10]
x = tf.reshape(x, [-1, 28 * 28])
with tf.GradientTape() as tape:
# x:[b,784]=>[b,10]
logits = model(x)
loss_mse = tf.reduce_mean(tf.losses.MSE(y, logits))
loss_ce = tf.reduce_mean(tf.losses.categorical_crossentropy(y,logits,from_logits=True))# 不设置True,会以为你传的是prob
grads = tape.gradient(loss_ce,model.trainable_variables) #求所有导数
optimizer.apply_gradients(zip(grads,model.trainable_variables)) #参数原地更新
if step%100 ==0:
print(epoch,step,‘loss:‘,float(loss_ce),float(loss_mse))
# test部分 每一个循环里
total_correct = 0
total_num = 0
for x,y in db_test:
x = tf.reshape(x, [-1, 28 * 28])
# test的时候不需要求梯度
logits = model(x)
# logits=>prob
prob = tf.nn.softmax(logits,axis=1)
# [b,10]-->[b,]
pred = tf.argmax(prob,axis=1)#得到最大值的索引 int64
y = tf.argmax(y,axis=1)#y不需要one_hot
#tf.equal 返回的是[b]的True/False
correct = tf.reduce_sum(tf.cast(tf.equal(pred,y),dtype=tf.int32))
total_correct+=int(correct)#correct类型是tensor,total_correct类型是numpy
total_num+=x.shape[0] #加上batch的个数
acc = total_correct/total_num
print(epoch,‘test acc:‘,acc)
if __name__ == ‘__main__‘: # 避免全局变量的污染
main()
Neural Network学习6 FashionMNIST实战