Tensorflow实现变分自编码器

自编码器一般功能是压缩数据。但是变分自编码器(variational autoencoder,VAE)的主要作用却是可以生成数据。变分自编码器的这种功能可以应用到很多领域,从生成手写文字到合成音乐。
变分自编码器的实现在标准自编码器上增加了一个网络层,该网络层有2个子网络组成:均值网络和方差网络。如下图所示:
Tensorflow实现变分自编码器
变分自编码器的训练过程与标准自编码器不同。首先是在编码器中引入了2个子网络均值和方差。其次是在编码器和解码器之间加入了一个采样层。采样层的输入为均值、方差以及高斯噪声,算法如下:
Tensorflow实现变分自编码器
最后采样层的结果输入到解码器。在进行误差反传的时候,变分自编码器并不是简单的使用mse等损失函数,而是损失函数的基础上增加了Kullback-Leibler散度(KL算法)。增加KL算法的原因是要确定均值和方差子网络是符合正态分布的。
变分自编码器的使用是用正态分布的随机数作为解码器的输入,在输出端就可以得到与输入类似但又不同的结果(比如图像等)。这就是变分自编码器最具吸引力的地方。变分自编码器是目前比较流行的生成模型之一。
变分自编码器之所以可以生成结果,是因为它提取了输入的特征。比如输入的是人脸图片,变分自编码器相当于保留了人脸的基本特征的均值和方差信息。通过调整这些信息(输入正态分布随数),就可以得到不同的人脸图片。
变分自编码器代码:

import os
import tensorflow as tf
from tensorflow import keras
from PIL import Image
from matplotlib import pyplot as plt
from tensorflow.keras import Sequential, layers
import numpy as np

tf.random.set_seed(2322)
np.random.seed(23422)

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert  tf.__version__.startswith('2.')

# 把num张图片保存到一张
def save_images(img, name,num):
    new_im = Image.new('L', (28*num, 28*num))
    index = 0
    for i in range(0, 28*num, 28):
        for j in range(0, 28*num, 28):
            im = img[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

# 定义超参数
batchsz = 256
lr = 1e-4

# 数据集加载,自编码器不需要标签因为是无监督学习
(x_train, _), (x_test, _) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

# 搭建模型
z_dim = 10
class VAE(keras.Model):
    def __init__(self,z_dim,units=256):
        super(VAE, self).__init__()
        self.z_dim = z_dim
        self.units = units
        # 编码网络
        self.vae_encoder = layers.Dense(self.units)
        # 均值网络
        self.vae_mean = layers.Dense(self.z_dim)      # get mean prediction
        # 方差网络(均值和方差是一一对应的,所以维度相同)
        self.vae_variance = layers.Dense(self.z_dim)      # get variance prediction

        # 解码网络
        self.vae_decoder = layers.Dense(self.units)
        # 输出网络
        self.vae_out = layers.Dense(784)

    # encoder传播的过程
    def encoder(self, x):
        h = tf.nn.relu(self.vae_encoder(x))
        #计算均值
        mu = self.vae_mean(h)
        #计算方差
        log_var = self.vae_variance(h)

        return  mu, log_var

    # decoder传播的过程
    def decoder(self, z):
        out = tf.nn.relu(self.vae_decoder(z))
        out = self.vae_out(out)

        return out

    def reparameterize(self, mu, log_var):
        eps = tf.random.normal(log_var.shape)

        std = tf.exp(log_var)         # 去掉log, 得到方差;
        std = std**0.5                # 开根号,得到标准差;

        z = mu + std * eps
        return z

    def call(self, inputs):
        mu, log_var = self.encoder(inputs)
        # reparameterizaion trick:最核心的部分
        z = self.reparameterize(mu, log_var)
        # decoder 进行还原
        x_hat = self.decoder(z)

        # Variational auto-encoder除了前向传播不同之外,还有一个额外的约束;
        # 这个约束使得你的mu, var更接近正太分布;所以我们把mu, log_var返回;
        return x_hat, mu, log_var

model = VAE(z_dim,units=128)
model.build(input_shape=(128, 784))
optimizer = keras.optimizers.Adam(lr=lr)

epochs = 30
for epoch in range(epochs):

    for step, x in enumerate(train_db):

        x = tf.reshape(x, [-1, 784])
        with tf.GradientTape() as tape:
            # shape
            x_hat, mu, log_var = model(x)

            # 把每个像素点当成一个二分类的问题;
            rec_loss = tf.losses.binary_crossentropy(x, x_hat, from_logits=True)
            rec_loss = tf.reduce_mean(rec_loss)

            # compute kl divergence (mu, var) ~ N(0, 1): 我们得到的均值方差和正太分布的;
            # 链接参考: https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
            kl_div = -0.5 * (log_var + 1 -mu**2 - tf.exp(log_var))
            kl_div = tf.reduce_mean(kl_div) / batchsz
            loss = rec_loss + 1. * kl_div

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step % 100 ==0:
            print('\repoch: %3d, step:%4d, kl_div: %5f, rec_loss:%9f' %(epoch, step, float(kl_div), float(rec_loss)),end="")

    num_pic = 9
    # evaluation 1: 从正太分布直接sample;
    z = tf.random.normal((batchsz, z_dim))                              # 从正太分布中sample这个尺寸的
    logits = model.decoder(z)                                           # 通过这个得到decoder
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    logits = x_hat.astype(np.uint8)                                     # 标准的图片格式;
    save_images(logits, 'd:\\vae_images\\sampled_epoch%d.png' %epoch,num_pic)         # 直接sample出的正太分布;

    # evaluation 2: 正常的传播过程;
    x = next(iter(test_db))
    x = tf.reshape(x, [-1, 784])
    x_hat_logits, _, _  = model(x)                       # 前向传播返回的还有mu, log_var
    x_hat = tf.sigmoid(x_hat_logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    x_hat = x_hat.astype(np.uint8)                       # 标准的图片格式;
    # print(x_hat.shape)
    save_images(x_hat, 'd:\\vae_images\\rec_epoch%d.png' %epoch,num_pic)

变分自编码器输出结果:
Tensorflow实现变分自编码器
手写数字的生成效果就差一些:
Tensorflow实现变分自编码器

上一篇:Oracle, Red Hat等大厂近期密集发布,加强支持Kubernetes


下一篇:集成学习(上):机器学习基础task2-掌握基本的回归模型