深度残差网络
深度残差网络创新点?
- 超深的网络结构,提出了Residule 结构.
- 使用Bach Normalizetion,Batch Normalization的目的是使我们的一批(Batch)feature map,同一维度,
满足均值为0,方差为1的分布规律。。 - 在残差块中添加1X1升维结构,大大降低超深网络参数。inception结构。
退化问题:随着网络深度的加深,会出现梯度消失和梯度爆炸
残差网络的结构如图
与之前网络结构不同的是增加了快速通道,将输入与输出(原)求和。网络层关系映射为 H(x), 残差网络拟合另一个映射, F(x):= H(x)-x , 那么原先的映射就是 F(x)+x。 优化残差映射F(x) 比优化原来的映射 H(x)容易。相当于剔除冗余特征,将独特特征留下,从而更好的实现分类。
举例34层整体结构图:
残差网络中的小构件
结构比较简单,代码实现堆叠即可。
import torch.nn as nn
import torch
#此结构是不带虚线的残差结构,
class BasicBlock(nn.Module):
expansion=1
#此模块是34层 18层的网络,不涉及1x1降维和升维度,只有3x3卷积核层的卷积
18层和34层
super(BasicBlock,self).__init__()
self.conv1=nn.Conv2d(in_channels=in_channel,out_channels=out_channel,kernel_size=3,stride=stride,padding=1,bias=False)
#该结构是输入输出保持一样大小
self.bn1=nn.BatchNorm2d(out_channel)
self.relu=nn.ReLU()
self.conv2=nn.Conv2d(in_channels=out_channel,out_channels=out_channel,kernel_size=3,stride=1,padding=1,bias=False)
#该结构是输入输出保持一样大小
self.bn2=nn.BatchNorm2d(out_channel)
self.downsampel=downsample
def forward(self,x):
identity=x
if self.downsample is not None:
identity=self.downsample(x)
out=self.conv1(x)
out=self.bn1(out)
out=self.relu(out)
out=self.con2(out)
out=self.bn2(out)
out+=identity
out=self.relu(out)
return out
class Bottleneck(nn.Module):
#虚线残差分支
expansion=4
def__init__(self,in_channel,out_channel,stride=1,downsample=None,group=1,width_per_group=64):
super(Bottleneck,self)__init__()
width=int(out_channel*(width_per_group/64.))*groups
self.conv1=nn.Conv2d(in_channels=in_channel,out_channels=width,kernel_size=1,stride=1,bias=False)
self.bn1=nn.BatchNorm2d(width)
self.conv2=nn.Conv2d(in_channels=width,out_channels=width,groups=groups,kernel_size=3,stride=stride,bias=False,padding=1)
self.bn2=nn.BatchNorm2d(width)
self.conv3=nn.Conv2d(in_channels=width,out_channels_out_channel*self.expansion,kernel_size=1,stride=1,bias=False)
self.bn3=nn.BatchNorm2d(out_channel*self.expansion)
self.relu=nn.ReLU(inplace=True)
self.downsample=downsample
def forward(self,x):
identity=x
if self.downsample is not None:
identity=self.downsample(X)
#此结构是快速通道增加了 1*1 升维,在34层中没有该结构,其他有
out=self.conv1(x)
out=self.bn1(out)
out=self.relu(out)
out=self.conv2(out)
out=self.bn2(out)
out=self.relu(out)
out=self.conv3(out)
out=self.bn3(out)
out+=identity
out=self.relu(out)
return out
class ResNet(nn.Module):
def__init__(self,block,block_num,num_classes=1000,include_top=True,group=1,width_per_group=64):
super(ResNet,self).__init__()
self.include_top=include_top
self.in_channel=64
self.groups=groups
self.width_per_group=width_per_group
self.conv1=nn.Conv2d(3,self.in_channel,kernel_size=7,stride=2,padding=3,bias=False)
self.bn1=nn.BatchNorm2d(self.in_channel)
self.relu=nn.ReLU(inplace=True)
self.maxpool=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)
self.layer1=self._make_layer(block,64,block_num[0])
self.layer2=self._make_layer(block,128,blocks_num[1],stride=2)
self.layer3=self._make_layer(block,256,blocks_num[2],stride=2)
self.layer4=self._make_layer(block,512,block_num[3],stride=2)
if self.include_top:
self.avgpool=nn.AdaptiveAvgPool2d((1,1))
self.fc=nn.Linear(512*block.expansion,num_classes)
for m in self.module():
if isinstance(m,nn.Conv2d):
nn.init.kaimin_normal_(m.weight,mode='fan_out',nonlinearity='relu')
def _mak_layer(self,block,channel,block_num,stride=1):
downsample=None
if stride1=1 or self.in_channel!=channel*block.expansion:
downsample=nn.Sequential(
nn.Conv2d(self.in_channel,channel*block.expansion,kernel_size=1,stride=stride,bias=False),
nn.BatchNorm2d(channel*block.expansion)
layers=[]
layers.append(block(self.in_channel,channel,
downsample=downsample,
stride=stride,
groups=slef.groups
width_per_group=self.width_per_group))
self.in_channel=channel*block.expansion
#个人认为34层的可以,但是50层输入通道输出通道不太合理
for _ in range(1,block_num):
layers.append(block(self.in_channel,channel,
groups=self.groups,
width_per_group=self.width_per_group))
return nn.Sequential(*layers)
def forward(self,x):
x=self.conv1(x)
x=self.bn1(x)
x=self.relu(x)
x=self.maxpool(x)
x=self.layer1(x)
x=self.layer2(x)
x=self.layer3(X)
x=self.layer4(x)
if self.include_top:
x=self.avgpool(x)
x=torch.flatten(x,1)
x=self.fc(x)
return x
def resnet34(num_classes=1000,include_top=True):
return ResNet(BasicBlock,[3,4,6,3],num_classes=num_classes,include_top=include_top)
def resnet50(num_classes=1000,include_top=True):
return ResNet(Bottleneck,[3,4,6,3],num_classes=num_classes,include_top=include_top)
def resnet101(num_classes=1000,include_top=True):
return ResNet(Bottleneck,[3,4,23,3],num_classes=num_classes,include_top=include_top)
def resnext50_32x4d(num_classes=1000,include_top=True):
groups=32
width_per_group=4
return ResNet(Bottleneck,[3,4,6,3],
num_classes=num_classes,
include_top=include_top,
group=group,
width_per_group=width_per_group)
def resnext101_32x8d(num_classes=1000,include_top=True):
groups=32
width_per_group=8
return ResNet(Bottleneck,[3,4,23,3],
num_classes=num_classes,
include_top=include_top,
groups=groups,
width_per_group=width_per_group)
训练模型
代码训练基本流程:
-
导入包.
1.1 基本torch、torch.nn、torch.optim一些相关的层,优化器。
1.2 torchvision中的训练数据预处理torchvision.transforms
1.3训练数据集处理datasets 图像读取并进行处理 ImageFolder。
1.3 导入模型包from model import resnet34
1.4 读取地址与文件 import os import json -
获取训练设备
device=torch.device(“cuda:0” if cuda.is_available() else “cpu”) -
训练数据与验证数据预处理 transforms.compose
训练
3.1 归一化,transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
3.2 裁剪,transforms.RandomResizedCrop(224)
3.3 水平反转,transforms.RandomHorizontalFlip()
3.4 totensor ,transforms.ToTensor()
测试
3.1 重新调整,transforms.Resize(224)
3.2 中心剪切,transforms.CenterCrop()
3.3 归一化,transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
3.4 变量化 ,transforms.ToTensor()
data_transform={
“train”:transforms.Compose([transfoms.RandomResizedCrop(224),
transforms.RandomHorizontalFilp(),
transforms.ToTensor(),
transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])
“val”:transforms.Compose([transforms.Resize(),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])
} -
读取图片datasets.ImageFolder 参数为root和transforms
train_dataset=datasets.ImageFolder(root=os.path.join(image_path,“train”),transforms= data_transforms[“train”]) -
根据读取的train_data,成为dataloader,就是变成批量化数据
train_loader=torch.util.data.DataLoader(train_data,bach_size=bach_size,shuffle=True,num_workers=nw)
训练数据获取处理好之后,加载到网络中进行训练 -
net=resnet34()
-
加载权重,使用迁移学习
model_weight_path="./resnet34.pth"
net.load_state_dict(torch.load(model_weight_path,map_location=device)) -
定义损失函数
loss_function=nn.CrossEntropyLoss() -
优化器 传入需要优化的参数
params=[p for p in net.parameters() if p.requires_grad]
optimizer=optim.Adam(param,lr=0.001) -
开始训练 训练轮数 优秀权重保持地址 每训练一个批次 验证一下
Tqdm 是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的迭代器 tqdm(iterator)。
net.train()
优化器清零
optimizer.zero_grad() -
前向传播
logits=net(image.to(device)) -
计算损失误差
loss=loss_function(logits,labels.to(device)) -
反向传播
loss.backward() -
参数更新
optimizer.step() -
数据验证
net.eval() -
根据得到的结果,计算准确率
predict_y=torch.max(outputs,dim=1)[1]
acc+=torch.eq(predict_y,val_labels.to(device)).sum().item() -
将优秀权重保存
if val_accurate>best_acc:
best_accurate=best_acc
torch.save(net.state_dict(),save_path)
完成测试
根据上一步保存的权重进行批量预测,批量预测就是批量前向传播
import os
import json
import torch
from PIL import Image
from torchvision import transforms
from model import resnet34
def main():
device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
data_transform=transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])
imgs_root="/.data/imgs"
#批量文件夹
assert os.path.exists(imgs_root),f"file:'{imgs_root}'dose not exist."
img_path_list=[os.path.json(imgs_root,i) for i in os.listdir(imgs_root) if i.endswith(".jpg")]
json_path='./class_indices.json'
assert os.path.exists(json_path).f"file:'{json_path}' dose not exist."
json_file=open(json_path,"r")
class_indict=json.load(json_file)
model=resnet34(num_classes=5).to(device)
weights_path="./resnet34.pth"
assert os.path.exists(weight_path).f"file:'{weight_path}'dose not exist."
model.loade_state_dict(torch.load(weights_paht,map_location=device))
model.eval()
batch_size=8
with torch.no_grad():
for ids in range(0,len(image_path_list)//bath_size):
img_list=[]
for img_path in img_path_list[ids*batch_size:(ids+1)*batch_size]:
assert os.path.exists(img_path),f"file":'{img_path}'dose not exist."
img=Image.ipen(img_path)
img=data_transform(img)
img_list.append(img)
batch_img=torch.stack(img_list,dim=0)#打包
output=model(batch_img.to(device)).cpu()
predict=torch.softmax(output,dim=1)
probs,classes=torch.max(predict,dim=1)
准确率
for idx,(pro,cla) in enumerate(zip(probs,classes)):
print('image:{} class:{} prob:{:.3}'.format(img_path_list[ids *batch_size+idex],class_indict[str(cla.numpy())],pro.numpy())
if __name__=='__main__':
main()