Keras实现CNN、RNN(基于attention 的双向RNN)及两者的融合
2018年04月24日 10:50:34 AI_盲 阅读数 7920更多
分类专栏: python 算法 machine learning deep learning keras
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xwd18280820053/article/details/80060544
本文主要采用CNN,RNN对时序数据进行二分类
CNN处理时序数据的二分类
-
model = Sequential()
-
model.add(Conv1D(128, 3, padding='same', input_shape=(max_lenth, max_features)))
-
model.add(BatchNormalization())
-
model.add(Activation('relu'))
-
model.add(Conv1D(256, 3))
-
model.add(BatchNormalization())
-
model.add(Activation('relu'))
-
model.add(Conv1D(128, 3))
-
model.add(BatchNormalization())
-
model.add(Activation('relu'))
-
model.add(GlobalAveragePooling1D()) #时序的时间维度上全局池化
-
model.add(Dropout(0.5))
-
model.add(Dense(1))
-
model.add(Activation('sigmoid'))
-
model.compile(loss='binary_crossentropy',
-
optimizer='adam',
-
metrics=[metrics.binary_crossentropy])
双层RNN处理时序数据的二分类
-
import numpy as np
-
import tensorflow as tf
-
from keras.models import Sequential
-
from keras.layers import Dense, Dropout
-
from keras.layers import GRU
-
import keras
-
from keras import regularizers
-
from keras.callbacks import EarlyStopping
-
from sklearn.metrics import roc_auc_score
-
from sklearn.cross_validation import StratifiedKFold
-
from keras import backend as K
-
import my_callbacks
-
from keras.layers.normalization import BatchNormalization
-
import keras.backend.tensorflow_backend as KTF
-
max_lenth = 23
-
max_features = 12
-
training_iters = 2000
-
train_batch_size = 800
-
test_batch_size = 800
-
n_hidden_units = 64
-
lr = 0.0003
-
cb = [
-
my_callbacks.RocAucMetricCallback(), # include it before EarlyStopping!
-
EarlyStopping(monitor='roc_auc_val',patience=200, verbose=2,mode='max')
-
]
-
model = Sequential()
-
model.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features)))
-
model.add(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多层时需设置为true
-
return_state=False, go_backwards=False, stateful=False, unroll=False)) #input_shape=(max_lenth, max_features),
-
model.add(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=False,
-
return_state=False, go_backwards=False, stateful=False, unroll=False)) #input_shape=(max_lenth, max_features),
-
model.add(Dropout(0.5))
-
model.add(Dense(1))
-
model.add(BatchNormalization())
-
model.add(keras.layers.core.Activation('sigmoid'))
-
model.compile(loss='binary_crossentropy',
-
optimizer='adam',
-
metrics=[metrics.binary_crossentropy])
-
model.fit(x_train, y_train, batch_size=train_batch_size, epochs=training_iters, verbose=2,
-
callbacks=cb,validation_split=0.2,
-
shuffle=True, class_weight=class_weight, sample_weight=None, initial_epoch=0)
-
pred_y = model.predict(x_test, batch_size=test_batch_size)
-
score = roc_auc_score(y_test,pred_y)
加入attention机制的 双向RNN(attention即对所有时刻的输出乘上对应的权重相加作为最终输出)
-
from keras import backend as K
-
from keras.layers import Layer
-
from keras import initializers, regularizers, constraints
-
def dot_product(x, kernel):
-
"""
-
Wrapper for dot product operation, in order to be compatible with both
-
Theano and Tensorflow
-
Args:
-
x (): input
-
kernel (): weights
-
Returns:
-
"""
-
if K.backend() == 'tensorflow':
-
return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
-
else:
-
return K.dot(x, kernel)
-
class AttentionWithContext(Layer):
-
"""
-
Attention operation, with a context/query vector, for temporal data.
-
Supports Masking.
-
Follows the work of Yang et al. [https://www.cs.cmu.edu/~diyiy/docs/naacl16.pdf]
-
"Hierarchical Attention Networks for Document Classification"
-
by using a context vector to assist the attention
-
# Input shape
-
3D tensor with shape: `(samples, steps, features)`.
-
# Output shape
-
2D tensor with shape: `(samples, features)`.
-
How to use:
-
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
-
The dimensions are inferred based on the output shape of the RNN.
-
Note: The layer has been tested with Keras 2.0.6
-
Example:
-
model.add(LSTM(64, return_sequences=True))
-
model.add(AttentionWithContext())
-
# next add a Dense layer (for classification/regression) or whatever...
-
"""
-
def __init__(self,
-
W_regularizer=None, u_regularizer=None, b_regularizer=None,
-
W_constraint=None, u_constraint=None, b_constraint=None,
-
bias=True, **kwargs):
-
self.supports_masking = True
-
self.init = initializers.get('glorot_uniform')
-
self.W_regularizer = regularizers.get(W_regularizer)
-
self.u_regularizer = regularizers.get(u_regularizer)
-
self.b_regularizer = regularizers.get(b_regularizer)
-
self.W_constraint = constraints.get(W_constraint)
-
self.u_constraint = constraints.get(u_constraint)
-
self.b_constraint = constraints.get(b_constraint)
-
self.bias = bias
-
super(AttentionWithContext, self).__init__(**kwargs)
-
def build(self, input_shape):
-
assert len(input_shape) == 3
-
self.W = self.add_weight((input_shape[-1], input_shape[-1],),
-
initializer=self.init,
-
name='{}_W'.format(self.name),
-
regularizer=self.W_regularizer,
-
constraint=self.W_constraint)
-
if self.bias:
-
self.b = self.add_weight((input_shape[-1],),
-
initializer='zero',
-
name='{}_b'.format(self.name),
-
regularizer=self.b_regularizer,
-
constraint=self.b_constraint)
-
self.u = self.add_weight((input_shape[-1],),
-
initializer=self.init,
-
name='{}_u'.format(self.name),
-
regularizer=self.u_regularizer,
-
constraint=self.u_constraint)
-
super(AttentionWithContext, self).build(input_shape)
-
def compute_mask(self, input, input_mask=None):
-
# do not pass the mask to the next layers
-
return None
-
def call(self, x, mask=None):
-
uit = dot_product(x, self.W)
-
if self.bias:
-
uit += self.b
-
uit = K.tanh(uit)
-
ait = dot_product(uit, self.u)
-
a = K.exp(ait)
-
# apply mask after the exp. will be re-normalized next
-
if mask is not None:
-
# Cast the mask to floatX to avoid float64 upcasting in theano
-
a *= K.cast(mask, K.floatx())
-
# in some cases especially in the early stages of training the sum may be almost zero
-
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
-
# a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
-
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
-
a = K.expand_dims(a)
-
weighted_input = x * a
-
return K.sum(weighted_input, axis=1)
-
def compute_output_shape(self, input_shape):
-
return input_shape[0], input_shape[-1]
-
model = Sequential()
-
model.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features)))
-
model.add(Bidirectional(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多层时需设置为true
-
return_state=False, go_backwards=False, stateful=False, unroll=False),merge_mode='concat')) #input_shape=(max_lenth, max_features),
-
model.add(Bidirectional(GRU(units=n_hidden_units,activation='selu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,
-
return_state=False, go_backwards=False, stateful=False, unroll=False),merge_mode='concat')) #input_shape=(max_lenth, max_features),
-
model.add(Dropout(0.5))
-
model.add(AttentionWithContext())
-
model.add(Dense(1))
-
model.add(BatchNormalization())
-
model.add(keras.layers.core.Activation('sigmoid'))
-
model.compile(loss='binary_crossentropy',
-
optimizer='adam',
-
metrics=[metrics.binary_crossentropy])
CNN-RNN融合
-
class NonMasking(Layer):
-
def __init__(self, **kwargs):
-
self.supports_masking = True
-
super(NonMasking, self).__init__(**kwargs)
-
def build(self, input_shape):
-
input_shape = input_shape
-
def compute_mask(self, input, input_mask=None):
-
# do not pass the mask to the next layers
-
return None
-
def call(self, x, mask=None):
-
return x
-
def get_output_shape_for(self, input_shape):
-
return input_shape
-
model_left = Sequential()
-
model_left.add(keras.layers.core.Masking(mask_value=0., input_shape=(max_lenth, max_features))) #解决不同长度的序列问题
-
model_left.add(GRU(units=left_hidden_units,activation='relu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,#多层时需设置为true
-
return_state=False, go_backwards=False, stateful=False, unroll=False))
-
model_left.add(GRU(units=left_hidden_units,activation='relu',kernel_initializer='orthogonal', recurrent_initializer='orthogonal',
-
bias_initializer='zeros', kernel_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
-
bias_regularizer=None, activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
-
bias_constraint=None, dropout=0.5, recurrent_dropout=0.0, implementation=1, return_sequences=True,
-
return_state=False, go_backwards=False, stateful=False, unroll=False))
-
model_left.add(NonMasking()) #Flatten()不支持masking,此处用于unmask
-
model_left.add(Flatten())
-
## FCN
-
model_right = Sequential()
-
model_right.add(Conv1D(128, 3, padding='same', input_shape=(max_lenth, max_features)))
-
model_right.add(BatchNormalization())
-
model_right.add(Activation('relu'))
-
model_right.add(Conv1D(256, 3))
-
model_right.add(BatchNormalization())
-
model_right.add(Activation('relu'))
-
model_right.add(Conv1D(128, 3))
-
model_right.add(BatchNormalization())
-
model_right.add(Activation('relu'))
-
model_right.add(GlobalAveragePooling1D())
-
model_right.add(Reshape((1,1,-1)))
-
model_right.add(Flatten())
-
model = Sequential()
-
model.add(Merge([model_left,model_right], mode='concat'))
-
model.add(Dense(128))
-
model.add(Dropout(0.5))
-
model.add(Dense(1))
-
model.add(BatchNormalization())
-
model.add(Activation('sigmoid'))
-
model.compile(loss='binary_crossentropy',
-
optimizer='adam',
-
metrics=['accuracy'])
-
model.fit([left_x_train,right_x_train], y_train, batch_size=train_batch_size, epochs=training_iters, verbose=2,
-
callbacks=[cb],validation_split=0.2,
-
shuffle=True, class_weight=None, sample_weight=None, initial_epoch=0)
-
pred_y = model.predict([left_x_test,right_x_test], batch_size=test_batch_size)
-
score = roc_auc_score(y_test,pred_y)