对比之前的复杂版本,这次的torch实现其实简单了不少,不过这和上面的代码实现逻辑过于复杂也有关系。
一、PyTorch实现
# Author : hellcat
# Time : 18-3-2 """
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1" import numpy as np
np.set_printoptions(threshold=np.inf) import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)
""" import torch as t
import torch.nn as nn
from torch.nn import functional as F class ResidualBlock(nn.Module):
def __init__(self, inchannel, outchannel, stride=1, shortcut=None):
super(ResidualBlock, self).__init__()
self.left = nn.Sequential(
nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1),
nn.BatchNorm2d(outchannel),
nn.ReLU(inplace=True),
nn.Conv2d(outchannel, outchannel, 3, 1, 1, bias=False),
nn.BatchNorm2d(outchannel)
)
self.right = shortcut def forward(self, x):
out = self.left(x)
residual = x if self.right is None else self.right(x)
out += residual
return F.relu(out) class ResNet(nn.Module):
def __init__(self, num_classes=1000):
super(ResNet, self).__init__()
self.pre = nn.Sequential(
nn.Conv2d(3, 64, 7, 2, 3, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
self.layer1 = self._make_layer(inchannel=64, outchannel=128, block_num=3)
self.layer2 = self._make_layer(inchannel=128, outchannel=256, block_num=4, stride=2)
self.layer3 = self._make_layer(inchannel=256, outchannel=512, block_num=6, stride=2)
self.layer4 = self._make_layer(inchannel=512, outchannel=512, block_num=3, stride=2) self.fc = nn.Linear(512, num_classes) def _make_layer(self, inchannel, outchannel, block_num, stride=1):
shortcut = nn.Sequential(
nn.Conv2d(inchannel, outchannel, 1, stride, bias=False),
nn.BatchNorm2d(outchannel)
)
layers = []
layers.append(ResidualBlock(inchannel, outchannel, stride, shortcut))
for i in range(1, block_num):
layers.append(ResidualBlock(outchannel, outchannel))
return nn.Sequential(*layers) def forward(self, x):
x = self.pre(x) # [1, 64, 56, 56]
x = self.layer1(x) # [1, 128, 56, 56]
x = self.layer2(x) # [1, 256, 28, 28]
x = self.layer3(x) # [1, 512, 14, 14]
x = self.layer4(x) # [1, 512, 7, 7] x = F.avg_pool2d(x, 7)
x = x.view(x.size(0), -1)
return self.fc(x) def hook(module, inputdata, output):
'''把这层的输出拷贝到features中'''
print("钩子输出:", output.data.size()) module = ResNet()
img = t.autograd.Variable(t.randn(1, 3, 224, 224))
handle = module.pre[0].register_forward_hook(hook)
out = module(img)
handle.remove()
print(out)
上面代码中,我们注册了钩子尝试分析一下中间的输出,可以看到,torch中的卷积层默认是SAME模式,输出就是in/stride,和TensorFlow一致,
torch.Size([1, 64, 112, 112])
Variable containing:
0.6336 -0.5863 0.6472 ... -0.4694 0.1808 0.2837
[torch.FloatTensor of size 1x1000]
二、TensorFlow实现
同样的逻辑下,ResNet34的TensorFlow实现如下,使用的封装包ops,之前有介绍过,这里面小修了卷积层的封装,使得conv2d可以舍弃bias(就是卷及计算后不加偏执),
# Author : hellcat
# Time : 18-3-7 """
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1" import numpy as np
np.set_printoptions(threshold=np.inf)
"""
import ops
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config) def ResidualBlock(x,
outchannel, stride=1, shortcut=None,
train=True, name="ResidualBlock"):
with tf.variable_scope(name):
conv1 = ops.conv2d(x, outchannel,
k_h=3, k_w=3,
s_h=stride, s_w=stride, scope="conv1")
bn1 = tf.nn.relu(ops.batch_normal(conv1, train=train, scope="bn1"))
conv2 = ops.conv2d(bn1, outchannel,
k_h=3, k_w=3,
s_h=1, s_w=1,
with_bias=False, scope="conv2")
left = ops.batch_normal(conv2, train=train, scope="bn2")
right = x if shortcut is None else shortcut(x)
return tf.nn.relu(left + right) class ResNet():
def __init__(self):
with tf.variable_scope("input"):
x = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3])
with tf.variable_scope("pre"):
conv = ops.conv2d(x, output_dim=64,
k_h=7, k_w=7,
s_h=2, s_w=2,
with_bias=False)
bn = tf.nn.relu(ops.batch_normal(conv))
pool = tf.nn.max_pool(bn, ksize=[1, 3, 3, 1],
strides=[1, 2, 2, 1], padding='SAME') with tf.variable_scope("layer1"):
layer1 = self._make_layer(pool, outchannel=128, block_num=3)
with tf.variable_scope("layer2"):
layer2 = self._make_layer(layer1, outchannel=256, block_num=4, stride=2)
with tf.variable_scope("layer3"):
layer3 = self._make_layer(layer2, outchannel=512, block_num=6, stride=2)
with tf.variable_scope("layer4"):
layer4 = self._make_layer(layer3, outchannel=512, block_num=3, stride=2)
# Tensor("layer1/ResidualBlock2/Relu_1:0", shape=(1, 56, 56, 128), dtype=float32)
# Tensor("layer2/ResidualBlock3/Relu_1:0", shape=(1, 28, 28, 256), dtype=float32)
# Tensor("layer3/ResidualBlock5/Relu_1:0", shape=(1, 14, 14, 512), dtype=float32)
# Tensor("layer4/ResidualBlock2/Relu_1:0", shape=(1, 7, 7, 512), dtype=float32)
pool = tf.nn.avg_pool(layer4, ksize=[1, 7, 7, 1],
strides=[1, 7, 7, 1], padding='SAME')
reshape = tf.reshape(pool, [layer4.get_shape()[0], -1])
self.fc = ops.linear(reshape, 1000) def __call__(self, *args, **kwargs):
return self.fc def _make_layer(self,x,
outchannel,
block_num, stride=1): def shortcut(input_):
with tf.variable_scope("shortcut"):
conv = ops.conv2d(input_, output_dim=outchannel,
k_w=1, k_h=1, s_w=stride, s_h=stride,
with_bias=False)
return ops.batch_normal(conv) x = ResidualBlock(x, outchannel, stride,
shortcut, name="ResidualBlock0")
for i in range(1, block_num):
x = ResidualBlock(x, outchannel,
name="ResidualBlock{}".format(i))
return x if __name__ == "__main__":
resnet = ResNet()
print(resnet())
ops.py卷积修改封装如下,
def conv2d(input_, output_dim,
k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
scope="conv2d", with_w=False, with_bias=True):
"""
卷积网络封装
:param input_:
:param output_dim: 输出的feature数目
:param k_h:
:param k_w:
:param s_h:
:param s_w:
:param stddev:
:param scope:
:param with_w:
:param with_bias: 是否含有bias层
:return:
""" with tf.variable_scope(scope):
w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
initializer=tf.truncated_normal_initializer(stddev=stddev))
conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME')
if with_bias:
biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
else:
biases = None if with_w:
return conv, w, biases
else:
return conv
输出如下,
Tensor("Linear/add:0", shape=(1, 1000), dtype=float32)
附:
ops.py截止本文发布的最新版本状态,
# Author : hellcat
# Time : 18-1-21
# Usage : 网络层函数封装
"""
conv2d
deconv2d
lrelu
linear
""" import tensorflow as tf # def batch_normal(x, train=True, epsilon=1e-5, decay=0.9, scope="batch_norm"):
# return tf.contrib.layers.batch_norm(x,
# decay=decay,
# updates_collections=None,
# epsilon=epsilon,
# scale=True,
# is_training=train,
# scope=scope) def batch_normal(x, epsilon=1e-5, momentum=0.9, train=True, scope='batch_norm'):
with tf.variable_scope(scope):
return tf.contrib.layers.batch_norm(x,
decay=momentum,
updates_collections=None,
epsilon=epsilon,
scale=True,
is_training=train)
'''
Note: when training, the moving_mean and moving_variance need to be updated.
By default the update ops are placed in `tf.GraphKeys.UPDATE_OPS`, so they
need to be added as a dependency to the `train_op`. For example: ```python
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
train_op = optimizer.minimize(loss)
``` One can set updates_collections=None to force the updates in place, but that
can have a speed penalty, especially in distributed settings.
''' # class batch_norm(object):
# def __init__(self, epsilon=1e-5, decay=0.9, scope="batch_norm"):
# with tf.variable_scope(scope):
# self.epsilon = epsilon
# self.decay = decay
# # self.scope = scope
#
# def __call__(self, x, scope, train=True):
# return tf.contrib.layers.batch_norm(x,
# decay=self.decay,
# updates_collections=None,
# epsilon=self.epsilon,
# scale=True,
# is_training=train,
# scope=scope) def concat(tensor_a, tensor_b):
"""
组合Tensor,注意的是这里tensor_a的宽高应该大于等于tensor_b
:param tensor_a: 前面的tensor
:param tensor_b: 后面的tensor
:return:
"""
if tensor_a.get_shape().as_list()[1] > tensor_b.get_shape().as_list()[1]:
return tf.concat([tf.slice(tensor_a,
begin=[0, (int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2,
(int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2, 0],
size=[int(tensor_b.shape[0]), int(tensor_b.shape[1]),
int(tensor_b.shape[2]), int(tensor_a.shape[3])],
name='slice'),
tensor_b],
axis=3, name='concat') elif tensor_a.get_shape().as_list()[1] < tensor_b.get_shape().as_list()[1]:
return tf.concat([tensor_a,
tf.slice(tensor_b,
begin=[0, (int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2,
(int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2, 0],
size=[int(tensor_a.shape[0]), int(tensor_a.shape[1]),
int(tensor_a.shape[2]), int(tensor_b.shape[3])],
name='slice')],
axis=3, name='concat')
else:
return tf.concat([tensor_a, tensor_b], axis=3) def conv_cond_concat(x, y):
"""
广播并连接向量,用于ac_gan的标签对矩阵拼接
:param x: features,例如shape:[n,16,16,128]
:param y: 扩暂维度后的标签,例如shape:[n,1,1,10]
:return: 拼接后features,例如:[n,16,16,138]
"""
x_shapes = x.get_shape()
y_shapes = y.get_shape()
return tf.concat([x, y * tf.ones([x_shapes[0], x_shapes[1], x_shapes[2], y_shapes[3]])], axis=3) def conv2d(input_, output_dim,
k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
scope="conv2d", with_w=False, with_bias=True):
"""
卷积网络封装
:param input_:
:param output_dim: 输出的feature数目
:param k_h:
:param k_w:
:param s_h:
:param s_w:
:param stddev:
:param scope:
:param with_w:
:param with_bias: 是否含有bias层
:return:
""" with tf.variable_scope(scope):
w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
initializer=tf.truncated_normal_initializer(stddev=stddev))
conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME')
if with_bias:
biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
else:
biases = None if with_w:
return conv, w, biases
else:
return conv def deconv2d(input_, output_shape,
k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02,
scope="deconv2d", with_w=False):
"""
转置卷积网络封装
:param input_:
:param output_shape: 输出的shape
:param k_h:
:param k_w:
:param s_h:
:param s_w:
:param stddev:
:param scope:
:param with_w:
:return:
"""
with tf.variable_scope(scope):
# filter : [height, width, output_channels, in_channels]
w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
initializer=tf.random_normal_initializer(stddev=stddev)) try:
deconv = tf.nn.conv2d_transpose(input_, w, output_shape=output_shape,
strides=[1, s_h, s_w, 1]) # Support for verisons of TensorFlow before 0.7.0
except AttributeError:
deconv = tf.nn.deconv2d(input_, w, output_shape=output_shape,
strides=[1, s_h, s_w, 1]) biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
deconv = tf.reshape(tf.nn.bias_add(deconv, biases), deconv.get_shape()) if with_w:
return deconv, w, biases
else:
return deconv def lrelu(x, leak=0.2):
"""
Leak_Relu层封装
:param x:
:param leak:
:return:
"""
return tf.maximum(x, leak*x) def linear(input_, output_size,
stddev=0.02, bias_start=0.0,
scope=None, with_w=False):
"""
全连接层封装
:param input_:
:param output_size: 输出节点数目
:param scope:
:param stddev:
:param bias_start: 使用常数初始化偏执,常数值设定
:param with_w: 返回是否返回参数Variable
:return:
"""
shape = input_.get_shape().as_list() with tf.variable_scope(scope or "Linear"):
matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32,
tf.random_normal_initializer(stddev=stddev))
bias = tf.get_variable("bias", [output_size], initializer=tf.constant_initializer(bias_start)) if with_w:
return tf.matmul(input_, matrix) + bias, matrix, bias
else:
return tf.matmul(input_, matrix) + bias