tensorflow实现去噪自编码器
自编码器(autoencoder):是一种无监督学习算法,主要用于数据降维,或者提取特征。在深度学习中,autoencoder可在训练开始前,提供权重矩阵的初始值。Xaiver Glorot与Yoshua Bengio在一篇论文中指出,如果深度学习模型的权重初始化得太小,那信号将在每层间传递时逐渐缩小而难以产生作用,如果权重初始得太大,那信号将在每层间传递时逐渐放大并导致发散和失效。而Xaiver初始化器做的事情就是让权重矩阵被初始化得不大不小,正好合适。从数学的角度讲,Xaiver就是让权重满足0得均值,同时方差为nin+nout2,分布可以用均匀分布或者高斯分布。
实现主要包括以下几个部分:
1.实现标准得均匀分布的Xaiver初始化器:xavier_init(fan_in,fan_out,constant=1)
def xavier_init(fan_in,fan_out,constant=1):
#fan_in与fan_out是输入输出节点数量,目标是返回Wights
#权重矩阵满足0均值,方差为2/(fan_in+fan_out)
low=-constant*np.sqrt(6.0/(fan_in+fan_out))
high=constant*np.sqrt(6.0/(fan_in+fan_out))
return tf.random_uniform((fan_in,fan_out),minval=low,\
maxval=high,dtype=tf.float32)
2.定义去噪编码的class:AdditiveGaussianNoiseAutocoder(object)
class AdditiveGaussianNoiseAutocoder(object):
#定义去噪自编码的class
def __init__(self,n_input,n_hidden,\
transfer_function=tf.nn.softplus,\
optimizer=tf.train.AdamOptimizer(),scale=0.1):
self.n_input=n_input#输入变量数
self.n_hidden=n_hidden#隐含层节点数
self.transfer=transfer_function#隐含层激活函数
self.scale=tf.placeholder(tf.float32)#高斯噪声系数
self.training_scale=scale#训练系数
network_weights=self._initialize_weights()
self.weights=network_weights
self.x=tf.placeholder(tf.float32,[None,self.n_input])#输入
self.hidden=self.transfer(tf.add(tf.matmul(\
self.x+scale*tf.random_normal((n_input,)),\
self.weights['w1']),self.weights['b1']))#隐含层处理
self.reconstruction=tf.add(tf.matmul(\
self.hidden,self.weights['w2']),\
self.weights['b2'])#重建操作
self.cost=0.5*tf.reduce_sum(tf.pow(tf.subtract(\
self.reconstruction,self.x),2.0))#自编码器的损失函数
self.optimizer=optimizer.minimize(self.cost)#优化器优化self.cost
init=tf.global_variables_initializer()
self.sess=tf.Session()
self.sess.run(init)#初始化全部模型参数
def _initialize_weights(self):#参数初始化函数
all_weights=dict()
all_weights['w1']=tf.Variable(xavier_init(self.n_input,\
self.n_hidden))
all_weights['b1']=tf.Variable(tf.zeros([self.n_hidden],\
dtype=tf.float32))
all_weights['w2']=tf.Variable(tf.zeros([self.n_hidden,\
self.n_input],dtype=tf.float32))
all_weights['b2']=tf.Variable(tf.zeros([self.n_input],\
dtype=tf.float32))
return all_weights
def partial_fit(self,X):#训练函数
cost,opt=self.sess.run((self.cost,self.optimizer),\
feed_dict={self.x:X,self.scale:self.training_scale})
return cost
def calc_total_cost(self,X):#计算cost的函数,用于训练完毕后进行模型训练评测
return self.sess.run(self.cost,feed_dict={self.x:X,\
self.scale:self.training_scale})
def transform(self,X):#返回自编码器隐含层的输出结果
return self.sess.run(self.hidden,feed_dict={self.x:X,\
self.scale:self.training_scale})
def generate(self,hidden=None):#将高阶特征复原为原始数据
if hidden is None:
hidden=np.random.normal(size=self.weights['b1'])
return self.sess.run(self.reconstruction,\
feed_dict={self.hidden:hidden})
def reconstruct(self,X):#整体运行一遍复原操作,包括transform与generate
return self.sess.run(self.reconstruction,feed_dict={self.x:X,\
self.scale:self.training_scale})
def getWeight(self):#获得隐含层权重w1
return self.sess.run(self.weights['w1'])
def getBiases(self):#获得隐含层偏系数b1
return self.sess.run(self.weights['b1'])
3.定义对训练集进行标准化处理的函数:standard_scale(X_train,X_test)
def standard_scale(X_train,X_test):#对训练集测试集进行标准化处理
preprocessor=prep.StandardScaler().fit(X_train)
X_train=preprocessor.transform(X_train)
X_test=preprocessor.transform(X_test)
return X_train,X_test
4.以放回抽样的方式随机取到batch_size大小的block:get_random_block_from_data(data,batch_size)
def get_random_block_from_data(data,batch_size):
#取一个从0到len(data)-batch_size之间的随机整数,
#再以这个随机数作为block的启始位置,顺序取到一个batch_size的数据
#放回抽样,提高数据利用率
start_index=np.random.randint(0,len(data)-batch_size)
return data[start_index:(start_index+batch_size)]
完整代码
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 22 10:11:34 2019
@author: Administrator
"""
import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import ssl
def xavier_init(fan_in,fan_out,constant=1):
#fan_in与fan_out是输入输出节点数量,目标是返回Wights
#权重满足0均值,方差为2/(fan_in+fan_out)
low=-constant*np.sqrt(6.0/(fan_in+fan_out))
high=constant*np.sqrt(6.0/(fan_in+fan_out))
return tf.random_uniform((fan_in,fan_out),minval=low,\
maxval=high,dtype=tf.float32)
class AdditiveGaussianNoiseAutocoder(object):
#定义去噪自编码的class
def __init__(self,n_input,n_hidden,\
transfer_function=tf.nn.softplus,\
optimizer=tf.train.AdamOptimizer(),scale=0.1):
self.n_input=n_input#输入变量数
self.n_hidden=n_hidden#隐含层节点数
self.transfer=transfer_function#隐含层激活函数
self.scale=tf.placeholder(tf.float32)#高斯噪声系数
self.training_scale=scale#训练系数
network_weights=self._initialize_weights()
self.weights=network_weights
self.x=tf.placeholder(tf.float32,[None,self.n_input])#输入
#隐含层处理,scale*tf.random_normal((n_input,))为噪声部分
self.hidden=self.transfer(tf.add(tf.matmul(\
self.x+scale*tf.random_normal((n_input,)),\
self.weights['w1']),self.weights['b1']))
self.reconstruction=tf.add(tf.matmul(\
self.hidden,self.weights['w2']),\
self.weights['b2'])#重建操作
self.cost=0.5*tf.reduce_sum(tf.pow(tf.subtract(\
self.reconstruction,self.x),2.0))#自编码器的损失函数
self.optimizer=optimizer.minimize(self.cost)#优化器优化self.cost
init=tf.global_variables_initializer()
self.sess=tf.Session()
self.sess.run(init)#初始化全部模型参数
def _initialize_weights(self):#参数初始化函数
all_weights=dict()
all_weights['w1']=tf.Variable(xavier_init(self.n_input,\
self.n_hidden))
all_weights['b1']=tf.Variable(tf.zeros([self.n_hidden],\
dtype=tf.float32))
all_weights['w2']=tf.Variable(tf.zeros([self.n_hidden,\
self.n_input],dtype=tf.float32))
all_weights['b2']=tf.Variable(tf.zeros([self.n_input],\
dtype=tf.float32))
return all_weights
def partial_fit(self,X):#训练函数
cost,opt=self.sess.run((self.cost,self.optimizer),\
feed_dict={self.x:X,self.scale:self.training_scale})
return cost
def calc_total_cost(self,X):#计算cost的函数,用于训练完毕后进行模型训练评测
return self.sess.run(self.cost,feed_dict={self.x:X,\
self.scale:self.training_scale})
def transform(self,X):#返回自编码器隐含层的输出结果
return self.sess.run(self.hidden,feed_dict={self.x:X,\
self.scale:self.training_scale})
def generate(self,hidden=None):#将高阶特征复原为原始数据
if hidden is None:
hidden=np.random.normal(size=self.weights['b1'])
return self.sess.run(self.reconstruction,\
feed_dict={self.hidden:hidden})
def reconstruct(self,X):#整体运行一遍复原操作,包括transform与generate
return self.sess.run(self.reconstruction,feed_dict={self.x:X,\
self.scale:self.training_scale})
def getWeight(self):#获得隐含层权重w1
return self.sess.run(self.weights['w1'])
def getBiases(self):#获得隐含层偏系数b1
return self.sess.run(self.weights['b1'])
ssl._create_default_https_context = ssl._create_unverified_context
mnist=input_data.read_data_sets('MNIST_data',one_hot=True)
def standard_scale(X_train,X_test):#对训练集测试集进行标准化处理
preprocessor=prep.StandardScaler().fit(X_train)
X_train=preprocessor.transform(X_train)
X_test=preprocessor.transform(X_test)
return X_train,X_test
def get_random_block_from_data(data,batch_size):
#取一个从0到len(data)-batch_size之间的随机整数,
#再以这个随机数作为block的启始位置,顺序取到一个batch_size的数据
#放回抽样,提高数据利用率
start_index=np.random.randint(0,len(data)-batch_size)
return data[start_index:(start_index+batch_size)]
X_train,X_test=standard_scale(mnist.train.images,mnist.test.images)
n_samples=int(mnist.train.num_examples)#训练集样本数
training_epochs=20#最大训练轮数
batch_size=128
display_step=1#每display_step显示一次cost
autoencoder=AdditiveGaussianNoiseAutocoder(n_input=784,\
n_hidden=200,\
transfer_function=tf.nn.softplus,\
optimizer=tf.train.AdamOptimizer(learning_rate=0.001),\
scale=0.01)
for epoch in range(training_epochs):
avg_cost=0
total_batch=int(n_samples/batch_size)
for i in range(total_batch):
batch_xs=get_random_block_from_data(X_train,batch_size)
cost=autoencoder.partial_fit(batch_xs)
avg_cost+=cost/n_samples*batch_size
if epoch %display_step==0:
print('Epoch','%04d'%(epoch+1),\
'cost=','{:.9f}'.format(avg_cost))
部分函数注释
tf.random_uniform(shape,minval=0,maxval=None,dtype=tf.float32,seed=None,name=None)
从均匀分布中输出随机值。生成的值在该[minval, maxval)范围内遵循均匀分布。下限 minval包含在范围内,而上限maxval被排除在外。对于浮点数,默认范围是[0,1)。对于整数,至少maxval必须明确地被指定。在整数情况下,随机整数稍有偏差,除非maxval-minval是2的精确幂。对于maxval-minval的值,偏差很小。
shape:一维整数张量或Python数组;输出张量的形状。
minval:dtype类型的0维张量或Python值;生成的随机值范围的下限。默认为0。
maxval:dtype类型的0维张量或Python值;要生成的随机值范围的上限。如果dtype是浮点,则默认为1。
dtype:输出的类型:float16、float32、float64、int32、orint64。
seed:一个 Python 整数。用于为分布创建一个随机种子。
name:操作的名称(可选)。
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 22 12:35:21 2019
@author: Administrator
"""
import tensorflow as tf
A=tf.random_uniform(shape=(4,1),minval=0,maxval=None,\
dtype=tf.float32,seed=None,name=None)
B=tf.random_uniform(shape=(3,3),minval=0,maxval=None,\
dtype=tf.float32,seed=None,name=None)
C=tf.random_uniform(shape=(1,5),minval=0,maxval=23,\
dtype=tf.int32,seed=None,name=None)
print('A:',A.eval())
print('B:',B.eval())
print('C:',C.eval())
#输出:
#A: [[0.02448559]
# [0.7221812 ]
# [0.15603411]
# [0.4908315 ]]
#B: [[0.45691633 0.8306242 0.24430227]
# [0.90618944 0.23485541 0.00950408]
# [0.51974654 0.51217234 0.6660249 ]]
#C: [[22 18 2 14 3]]
tf.random_normal(shape, mean=0.0, stddev=1.0, dtype=tf.float32, seed=None, name=None)
函数用于从服从指定正态分布的数值中取出指定个数的值。
shape: 输出张量的形状,必选。
mean: 正态分布的均值,默认为0。
stddev: 正态分布的标准差,默认为1.0。
dtype: 输出的类型,默认为tf.float32。
seed: 随机数种子,是一个整数,当设置之后,每次生成的随机数都一样。
name: 操作的名称。
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 22 12:35:21 2019
@author: Administrator
"""
import tensorflow as tf
A=tf.random_normal(shape=(3,2), mean=0.0, stddev=1.0, dtype=tf.float32, seed=None, name=None)
print(A.eval())
#输出
#array([[-0.3779934 , 0.5889418 ],
# [-0.08279686, -0.21108189],
# [-0.06789723, 0.87856984]], dtype=float32)
sklearn.preprocessing.Standardscaler
将数据按期属性(按列进行)减去其均值,并除以其方差。得到的结果是,对于每个属性/每列来说所有数据都聚集在0附近,方差为1。
使用sklearn.preprocessing.Standardscaler.fit(X_train)可直接对X_train进行训练,并保留相关参数,可直接使用其对象转换测试集数据。
preprocessor=sklearn.preprocessing.Standardscaler.fit(X_train)
X_train=preprocessor.transform(X_train)
X_test=preprocessor.transform(X_test)
numpy.random.randint(low, high=None, size=None, dtype=‘l’)
函数的作用是产生离散均匀分布的整数,范围从低(包括)到高(不包括),即[low, high)。如果没有写参数high的值,则返回[0,low)的值。
low:int;生成的数值最低要大于等于low。(hign = None时,生成的数值要在[0, low)区间内)。
high:int (可选);如果使用这个值,则生成的数值在[low, high)区间。
size:int or tuple of ints(可选);输出随机数的尺寸,比如size = (m * n* k)则输出同规模即m * n* k个随机数。默认是None的,仅仅返回满足要求的单一随机数。
dtype:dtype(可选):想要输出的格式。如int64、int等等。
输出:int or ndarray of ints返回一个随机数或随机数数组。
import numpy as np
a=np.random.randint(14,size=3)
b=np.random.randint(3,20)
print(a,'\n',b)
#输出
#[ 0 8 10]
#7