同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)

同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)


上一集: 使用Gan实现彩色星球图片生成(pytorch版)

1. 描述

数据集使用和上次使用Gan实现彩色星球图片生成(pytorch版)一样的数据集,仍然使用其中的32张星球图片,同时将Batch size设置为32来验证实验效果。
训练图片(32张,来源space engine):
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)

1.1 传统Gan判别器训练结果

在传统Gan判别器中,判别器输出为单个值,表示真假,在训练之后得到的图像如下:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
结论:图像整体轮廓为圆形,但细节区域并不是很清晰。

1.2 马尔可夫(PatchGan)判别器训练结果

PatchGan判别器并不使用线性层,输出为一个矩阵,每个值表示原图中某个区块对应的真假,取平均值作为均衡之后的结果。

注:作为初学者,在这里将矩阵展平为一维矩阵后,使用了等长的一维矩阵作为真值标签然后计算MSELoss,并不能保证这个方法的正确性,属于自己的理解,不过最后的结果还是可观的,如有问题欢迎大佬指正。

单独使用了Patch判别器的训练代码并未单独给出,但相关代码实质都已包含在下面的复合代码中。
单独使用Patch判别器训练之后得到的生成结果如下:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
结论:图像细节变得更加清晰,但是在整体结构上却不再是标准的圆形。由于Patch判别器关注局部的原因,可以明显发现图像由多个半径不同的圆弧构成,导致出现了漩涡状边缘。

1.3 同时使用两种判别器

在前两次实验的基础上,决定尝试同时使用两种判别器,单独更新两个判别器之后,用两个判别器loss的和来更新生成器,试图兼顾整体轮廓与局部细节。
实验输入训练图像的像素为264x264,可以通过将config.from_old_model设置为True来读取上一次训练保存的网络与优化器参数,从而在上次训练的结果上继续训练下去,默认100个epoch保存一次,可在config中配置。

2. 代码

依然分为主体代码main.py与模型代码model.py两部分。

2.1 模型代码model.py

import torch
import torch.nn as nn


# 生成器,基于上采样
class G_net(nn.Module):
    def __init__(self):
        super(G_net, self).__init__()
        self.expand = nn.Sequential(
            nn.Linear(128, 2048),
            nn.BatchNorm1d(2048),
            nn.Dropout(0.5),
            nn.LeakyReLU(0.2),
            nn.Linear(2048, 4096),
            nn.BatchNorm1d(4096),
            nn.Dropout(0.5),
            nn.LeakyReLU(0.2),
        )
        self.gen = nn.Sequential(
            # 反卷积扩张尺寸,保持kernel size能够被stride整除来减少棋盘效应
            nn.ConvTranspose2d(64, 128, kernel_size=4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(256, 64, kernel_size=6, stride=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2),
            # 尾部添加正卷积压缩减少棋盘效应
            nn.Conv2d(16, 8, kernel_size=5, stride=1, padding=1),
            nn.BatchNorm2d(8),
            nn.LeakyReLU(0.2),
            nn.Conv2d(8, 4, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(4),
            nn.LeakyReLU(0.2),
            nn.Conv2d(4, 3, kernel_size=3, stride=1, padding=1),

            # 将输出约束到[-1,1]
            nn.Tanh()
        )

    def forward(self, img_seeds):
        img_seeds = self.expand(img_seeds)
        # 将线性数据重组为二维图片
        img_seeds = img_seeds.view(-1, 64, 8, 8)
        output = self.gen(img_seeds)
        return output


# 全局判别器,传统gan
class D_net_global(nn.Module):
    def __init__(self):
        super(D_net_global,self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=5, stride=3, padding=1, bias=False),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(512, 16, kernel_size=4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2, True),
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 1024),
            nn.ReLU(True),
            nn.Linear(1024, 1),
            nn.Sigmoid(),
        )

    def forward(self, img):
        features = self.features(img)
        features = features.view(features.shape[0], -1)
        output = self.classifier(features)
        return output


# 局部判别器,patchgan
class D_net_patch(nn.Module):
    def __init__(self):
        super(D_net_patch,self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, bias=False),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, bias=False),
            nn.Sigmoid(),
        )

    def forward(self, img):
        features = self.features(img)
        features = features.view(features.shape[0], -1)
        return features


# 返回对应的生成器
def get_G_model(from_old_model, device, model_path):
    model = G_net()
    # 从磁盘加载之前保存的模型参数
    if from_old_model:
        model.load_state_dict(torch.load(model_path))
    # 将模型加载到用于运算的设备的内存
    model = model.to(device)
    return model

# 返回全局判别器的模型
def get_D_model_global(from_old_model, device, model_path):
    model = D_net_global()
    # 从磁盘加载之前保存的模型参数
    if from_old_model:
        model.load_state_dict(torch.load(model_path))
    # 将模型加载到用于运算的设备的内存
    model = model.to(device)
    return model

# 返回局部判别器的模型
def get_D_model_patch(from_old_model, device, model_path):
    model = D_net_patch()
    # 从磁盘加载之前保存的模型参数
    if from_old_model:
        model.load_state_dict(torch.load(model_path))
    # 将模型加载到用于运算的设备的内存
    model = model.to(device)
    return model

2.2 训练代码main.py

from torch.utils.data import Dataset, DataLoader
import time
from torch.optim import AdamW
from model import *
from torchvision.utils import save_image
import random
from torch.autograd import Variable
import os
import cv2
from albumentations import Normalize, Compose, Resize, IAAAdditiveGaussianNoise, GaussNoise
from albumentations.pytorch import ToTensorV2
from apex import amp
import pickle


# ------------------------------------config------------------------------------
class config:
    # 设置种子数,配置是否要固定种子数
    seed = 26
    use_seed = True

    # 配置是否要从磁盘加载之前保存的模型参数继续训练
    from_old_model = False

    # 使用apex加速训练
    use_apex = True

    # 运行多少个epoch之后停止
    epochs = 20000
    # 配置batch size
    batchSize = 32

    # 训练图片输入分辨率
    img_size = 264

    # 配置喂入生成器的随机正态分布种子数有多少维(如果改动,需要在model中修改网络对应参数)
    img_seed_dim = 128

    # 有多大概率在训练判别器D时交换正确图片的标签和伪造图片的标签
    D_train_label_exchange = 0.1

    # 是否在图片中添加噪声
    add_noise = False

    # 定义每张图对应的lable的长度,基于patch判别器结构与输入分辨率计算得出
    patch_label_length = 121

    # 将数据集保存在内存中还是磁盘中
    # 小型数据集可以整个载入内存加快速度
    read_from = "Memory"
    # read_from = "Disk"

    # 保存模型参数文件的路径
    G_model_path = "G_model.pth"
    D_model_global_path = "D_model_global.pth"
    D_model_patch_path = "D_model_patch.pth"

    # 保存优化器参数文件的路径
    G_optimizer_path = "G_optimizer.pth"
    D_optimizer_global_path = "D_optimizer_global.pth"
    D_optimizer_patch_path = "D_optimizer_patch.pth"

    # 保存当前保存模型的历史总计训练epoch数
    epoch_record_path = "epoch_count.pkl"

    # 损失函数
    # 使用均方差损失函数
    criterion = nn.MSELoss()

    # 多少个epoch之后保存一次模型
    save_step = 100

    # ------------------------------------路径配置------------------------------------
    # 数据集来源
    img_path = "train_images/"
    # 输出图片的文件夹路径
    output_path = "output_images/"

    # 如果继续训练,则读取之前进行过多少次epoch的训练
    if from_old_model:
        with open(epoch_record_path, "rb") as file:
            last_epoch_number = pickle.load(file)
    else:
        last_epoch_number = 0


# 固定随机数种子
def seed_all(seed):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True


if config.use_seed:
    seed_all(seed=config.seed)

# -----------------------------------transforms------------------------------------
def get_transforms(img_size):
    # 缩放分辨率并转换到0-1之间
    return Compose(
        [Resize(img_size, img_size),
         Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5), max_pixel_value=255.0, p=1.0),
         ToTensorV2(p=1.0)]
    )


# 在图像输入鉴别器D之前对其添加随机噪声
# 此处写法参考:https://blog.csdn.net/Mr_Lowbee/article/details/107990345
# 注:在训练过程中实际并没有启用噪声
def add_noise(image, sigma=20):
    if config.add_noise == True:
        sigma = sigma / 255
        noise_img = image + sigma * torch.randn_like(image)
        noise_img = noise_img.clamp(0, 1)
        return noise_img
    else:
        return image


# ------------------------------------dataset------------------------------------
# 从磁盘读取数据的dataset
if config.read_from == "Disk":
    class image_dataset(Dataset):
        def __init__(self, file_list, img_path, transform):
            # files list
            self.file_list = file_list
            self.img_path = img_path
            self.transform = transform

        def __getitem__(self, index):
            image_path = self.img_path + self.file_list[index]
            img = cv2.imread(image_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = self.transform(image=img)['image']
            return img

        def __len__(self):
            return len(self.file_list)

# 从内存读取数据的dataset
elif config.read_from == "Memory":
    class image_dataset(Dataset):
        def __init__(self, file_list, img_path, transform):
            self.imgs = []
            for file in file_list:
                image_path = img_path + file
                img = cv2.imread(image_path)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                img = transform(image=img)['image']
                self.imgs.append(img)

        def __getitem__(self, index):
            return self.imgs[index]

        def __len__(self):
            return len(self.imgs)


# ------------------------------------main------------------------------------
def main():
    # 如果可以使用GPU运算,则使用GPU,否则使用CPU
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print("Use " + str(device))

    # 创建输出文件夹
    if not os.path.exists(config.output_path):
        os.mkdir(config.output_path)

    # 创建dataset
    # create dataset
    file_list = None
    for path, dirs, files in os.walk(config.img_path, topdown=False):
        file_list = list(files)

    train_dataset = image_dataset(file_list, config.img_path, transform=get_transforms(config.img_size))
    train_loader = DataLoader(dataset=train_dataset, batch_size=config.batchSize, shuffle=True)

    # 从model中获取判别器D和生成器G的网络模型
    # 判别器分为global全局判别器与patch局部判别器
    G_model = get_G_model(config.from_old_model, device, config.G_model_path)
    D_model_global = get_D_model_global(config.from_old_model, device, config.D_model_global_path)
    D_model_patch = get_D_model_patch(config.from_old_model, device, config.D_model_patch_path)

    # 定义G和D的优化器,此处使用AdamW优化器
    G_optimizer = AdamW(G_model.parameters(), lr=3e-4, weight_decay=1e-6)
    D_optimizer_global = AdamW(D_model_global.parameters(), lr=3e-4, weight_decay=1e-6)
    D_optimizer_patch = AdamW(D_model_patch.parameters(), lr=3e-4, weight_decay=1e-6)

    # 如果是读取之前训练的数据,则加载保存的优化器参数
    if config.from_old_model:
        G_optimizer.load_state_dict(torch.load(config.G_optimizer_path))
        D_optimizer_global.load_state_dict(torch.load(config.D_optimizer_global_path))
        D_optimizer_patch.load_state_dict(torch.load(config.D_optimizer_patch_path))

    # 损失函数
    criterion = config.criterion

    # 混合精度加速
    if config.use_apex:
        G_model, G_optimizer = amp.initialize(G_model, G_optimizer, opt_level="O1")
        D_model_global, D_optimizer_global = amp.initialize(D_model_global, D_optimizer_global, opt_level="O1")
        D_model_patch, D_optimizer_patch = amp.initialize(D_model_patch, D_optimizer_patch, opt_level="O1")

    # 记录训练时间
    train_start = time.time()

    # 定义标签,单值标签用于传统判别器,多值标签用于patch判别器
    # 定义真标签,使用标签平滑的策略,全0.9
    real_labels_global = Variable(torch.ones(config.batchSize, 1)-0.1).to(device)
    real_labels_patch = Variable(torch.ones(config.batchSize, config.patch_label_length)-0.1).to(device)

    # 定义假标签,单向平滑,因此不对生成器标签进行平滑处理,全0
    fake_labels_global = Variable(torch.zeros(config.batchSize, 1)).to(device)
    fake_labels_patch = Variable(torch.zeros(config.batchSize, config.patch_label_length)).to(device)

    # 开始训练的每一个epoch
    for epoch in range(config.epochs):
        print("start epoch "+str(epoch+1)+":")
        # 定义一些变量用于记录进度和损失
        batch_num = len(train_loader)
        D_loss_sum_global = 0
        D_loss_sum_patch = 0
        G_loss_sum = 0
        count = 0

        # 从dataloader中提取数据
        for index, images in enumerate(train_loader):
            count += 1
            # 将图片放入运算设备的内存
            images = images.to(device)

            # 记录真假标签是否被交换过
            exchange_labels = False

            # 有一定概率在训练判别器时交换label
            if random.uniform(0, 1) < config.D_train_label_exchange:
                real_labels_global, fake_labels_global = fake_labels_global, real_labels_global
                real_labels_patch, fake_labels_patch = fake_labels_patch, real_labels_patch
                exchange_labels = True

            # 训练判断器D_global
            D_optimizer_global.zero_grad()
            # 将随机的初始数据喂入生成器生成假图像
            img_seeds = torch.randn(config.batchSize, config.img_seed_dim).to(device)
            fake_images = G_model(img_seeds)
            # 用真样本输入判别器
            real_output = D_model_global(add_noise(images))
            # 对于数据集末尾的数据,长度不够一个batch size时需要去除过长的真实标签
            if len(real_labels_global) > len(real_output):
                D_loss_real = criterion(real_output, real_labels_global[:len(real_output)])
            else:
                D_loss_real = criterion(real_output, real_labels_global)
            # 用假样本输入判别器
            fake_output = D_model_global(add_noise(fake_images))
            D_loss_fake = criterion(fake_output, fake_labels_global)
            # 将真样本与假样本损失相加,得到判别器的损失
            D_loss_global = D_loss_real + D_loss_fake
            D_loss_sum_global += D_loss_global.item()
            # 重置优化器
            D_optimizer_global.zero_grad()
            # 用损失更新判别器
            if config.use_apex:
                with amp.scale_loss(D_loss_global, D_optimizer_global) as scaled_loss:
                    scaled_loss.backward()
            else:
                D_loss_global.backward()
            D_optimizer_global.step()

            # 训练判断器D_patch
            D_optimizer_patch.zero_grad()
            # 将随机的初始数据喂入生成器生成假图像
            img_seeds = torch.randn(config.batchSize, config.img_seed_dim).to(device)
            fake_images = G_model(img_seeds)
            # 用真样本输入判别器
            real_output = D_model_patch(add_noise(images))
            # 对于数据集末尾的数据,长度不够一个batch size时需要去除过长的真实标签
            if len(real_labels_patch) > len(real_output):
                D_loss_real = criterion(real_output, real_labels_patch[:len(real_output)])
            else:
                D_loss_real = criterion(real_output, real_labels_patch)
            # 用假样本输入判别器
            fake_output = D_model_patch(add_noise(fake_images))
            D_loss_fake = criterion(fake_output, fake_labels_patch)
            # 将真样本与假样本损失相加,得到判别器的损失
            D_loss_patch = D_loss_real + D_loss_fake
            D_loss_sum_patch += D_loss_patch.item()
            # 重置优化器
            D_optimizer_patch.zero_grad()
            # 用损失更新判别器
            if config.use_apex:
                with amp.scale_loss(D_loss_patch, D_optimizer_patch) as scaled_loss:
                    scaled_loss.backward()
            else:
                D_loss_patch.backward()
            D_optimizer_patch.step()

            # 如果之前交换过真假标签,此时再换回来
            if exchange_labels:
                real_labels_global, fake_labels_global = fake_labels_global, real_labels_global
                real_labels_patch, fake_labels_patch = fake_labels_patch, real_labels_patch

            # 训练生成器G
            # 将随机种子数喂入生成器G生成假数据
            img_seeds = torch.randn(config.batchSize, config.img_seed_dim).to(device)
            fake_images = G_model(img_seeds)
            # 将假数据输入判别器
            fake_output_global = D_model_global(add_noise(fake_images))
            fake_output_patch = D_model_patch(add_noise(fake_images))
            # 将假数据的判别结果与真实标签对比得到损失
            G_loss_global = criterion(fake_output_global, real_labels_global)
            G_loss_patch = criterion(fake_output_patch, real_labels_patch)
            G_loss = G_loss_global + G_loss_patch
            G_loss_sum += G_loss.item()
            # 重置优化器
            G_optimizer.zero_grad()
            # 利用损失更新生成器G
            if config.use_apex:
                with amp.scale_loss(G_loss, G_optimizer) as scaled_loss:
                    scaled_loss.backward()
            else:
                G_loss.backward()
            G_optimizer.step()

            # 打印程序工作进度
            if (index + 1) % 200 == 0:
                print("Epoch: %2d, Batch: %4d / %4d" % (epoch + 1, index + 1, batch_num))

        if (epoch+1) % config.save_step == 0:
            # 在每N个epoch结束时保存模型参数到磁盘文件
            torch.save(G_model.state_dict(), config.G_model_path)
            torch.save(D_model_global.state_dict(), config.D_model_global_path)
            torch.save(D_model_patch.state_dict(), config.D_model_patch_path)
            # 在每N个epoch结束时保存优化器参数到磁盘文件
            torch.save(G_optimizer.state_dict(), config.G_optimizer_path)
            torch.save(D_optimizer_global.state_dict(), config.D_optimizer_global_path)
            torch.save(D_optimizer_patch.state_dict(), config.D_optimizer_patch_path)
            # 保存历史训练总数
            with open(config.epoch_record_path, "wb") as file:
                pickle.dump(config.last_epoch_number + epoch + 1, file, 1)
            # 在每N个epoch结束时输出一组生成器产生的图片到输出文件夹
            img_seeds = torch.randn(config.batchSize, config.img_seed_dim).to(device)
            fake_images = G_model(img_seeds).cuda().data
            # 将假图像缩放到[0,1]的区间
            fake_images = 0.5 * (fake_images + 1)
            fake_images = fake_images.clamp(0, 1)
            # 连接所有生成的图片然后用自带的save_image()函数输出到磁盘文件
            fake_images = fake_images.view(-1, 3, config.img_size, config.img_size)
            save_image(fake_images, config.output_path+str(config.last_epoch_number + epoch + 1)+'.png')

        # 打印该epoch的损失,时间等数据用于参考
        print("D_loss_global:", round(D_loss_sum_global / count, 3))
        print("D_loss_patch:", round(D_loss_sum_patch / count, 3))
        print("G_loss:", round(G_loss_sum / count, 3))
        current_time = time.time()
        pass_time = int(current_time - train_start)
        time_string = str(pass_time // 3600) + " hours, " + str((pass_time % 3600) // 60) + " minutes, " + str(
            pass_time % 60) + " seconds."
        print("Time pass:", time_string)
        print()

    # 运行结束
    print("Done.")


if __name__ == '__main__':
    main()

3. 效果

100个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
1000个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
5000个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
10000个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
15000个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
20000个epoch:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)

25000个epoch:

同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
结论:虽然没有单纯只用传统判别器那么圆滑,仍然可以观察到微小的瑕疵,但至少在整体上的轮廓已经接近球体,而星球表面的细节也变得更加清晰了。

4. 趣图

在第8800个epoch时,右下角出现了非常有趣的结果:
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
同时使用传统Gan判别器和马尔可夫判别器实现彩色星球图片生成(pytorch版)
如果我没记错的话,我是让你学习画星星不是怎么画人脸吧……【抖抖抖
而且这个发型总觉得看起来很眼熟的样子……【微妙

上一篇:03-vue中key的作用和工作原理


下一篇:Adversarial patch camouflage against aerial detection