序章
Recurrent Neural NetWork (RNN) 用于处理序列数据,序列数据预测模型的特点是某一步的输出不仅依赖于这一步的输入,还依赖于其他步的输入或输出。传统的序列数据机器学习模型有Hidden Markov Model (隐马尔可夫模型)、Conditional Random Field (条件随机场)。近年来,深度学习模型又带来了RNN,标准RNN结构极为简单,只有一个tanh层,其模型结构见图1。
Fig 1 The structure of RNN model
但传统的RNN存在着一个缺点:
当预测信息距离参考信息的时间跨度较远的时候,RNN就容易出现梯度消失的问题,之前的信息逐渐丢失,其保存的信息通常都是短期信息。反向传播过程中,当序列非常长的时候,RNN就会出现梯度消失的问题,导致前面的神经元权重不发生变化,没有训练效果;此外,当每项的偏导数都很大的时候,RNN也会出现梯度爆炸的现象。
LSTM原理
Long short term memory(LSTM)是一种特殊的RNN,可以解决梯度消失的问题,能学习到长期依赖关系。LSTM也是相同的链式结构,但他有四个神经网络层相互作用,模型见图2。
Fig 2 The structure of LSTM model
LSTM的核心是cell state,即贯穿顶部的水平线。它像传送带一样输送着长期信息,见图3。LSTM可以往这条线中添加或是移除信息,这都是由几种门结构控制,包括sigmoid(0不通过,1全部通过)、点乘运算。LSTM把训练信息分成长期信息 Ct 和短期信息 ht ,长期信息可以不受影响地传递很远。
Fig 3 The cell state of LSTM
下面介绍LSTM的核心神经网络层,即四大门。
1. 遗忘门
遗忘门的输入是ht-1、Xt,它通过一个sigmoid激活函数来决定对长期信息 cell state 的保留和删除,随后输出一个介于0-1之间的值与Ct-1点乘,结构见图4。
Fig 4 The forget gate
2. 输入门
下一步是决定应该往 cell state 中添加什么样的信息,由两部分组成。1. sigmoid layer负责决定更新的值 it,主要涉及更新哪些值及更新程度。2. tanh layer负责创建即将加入cell state的 新Ct。结构见图5。
Fig 5 The Input gate
3. 更新门
对以上的信息进行更新,其中 ft 决定长期信息 Ct 的保留程度,it 决定本次更新的值,新Ct决定即将加入cell state的cell value。这里主要负责更新长期记忆 cell state 的信息,结构见图6。
Fig 6 The Update gate
4. 输出门
最后是输出门,首先通过sigmoid层决定输出信息,然后通过tanh层激活cell state 及点乘前者输出来得到当前时刻的预测值ht,结构见图7。
Fig 7 The output gate
综合来看,更新门负责更新长期记忆,cell state;输出门结合当前时刻输入信息和 cell state 预测当前时刻的输出;而遗忘门、输入门则是给更新门打工(帮助更新长期记忆信息cell state)。
实战
-
LSTM模型的Pytorch实现及输入/输出数据说明
nn.LSTM(input_size, hidden_size, num_layer, bias=True, batch_first=False, dropout=0, bidirectional=false) self.lstm(input, (h_o, c_0)) ''' input description 输入数据按如下形式传入 (input, (h_0,c_0)) input: 序列数据格式 (seq, batch_size, input_size) Default: 如果不提供h_0, c_0,那么默认为 0 h_0: 每个元素的初始隐藏状态 (num_layers * num_directions, batch_size, hidden_size) c_0: 每个元素的初始单元格状态 (num_layers * num_directions, batch_size, hidden_size) end ''' class LSTM(nn.Module): def __init__(self, input_size=2, hidden_layer_size=100, output_size=1): super().__init__() self.lstm = nn.LSTM(input_size,hidden_size,num_layer) self.linear = nn.Linear(hidden_size, output_size) def forward(self, input_seq): # lstm处理序列数据,并传递到hidden_cell,输出lstm_out # 输入数据格式:input(seq_len, batch, input_size) # seq_len:每个序列的长度(time_step) # batch_size:... # input_size:输入矩阵特征数(即有几个类型的变量) x, (h_n, c_n) = self.lstm(x) # h_0/c_0默认0 s,b,h = x.size() x = x.reshape(s*b,h) x = self.linear(x) x = x.view(s,b,-1) return predictions # 此处输出seq_len个时刻的输出,可以取索引/也可以用h_n选最后一个值 nn.LSTM(input_size,hidden_size,num_layer) ''' output description 输出数据形式: output, (h_n, c_n) output: (seq_len, batch_size, input_size) 包含所有time step的输出 h_n: 最后一个time_step的输出结果, (num_layers * num_directions, batch, hidden_size) (default: num_directions==1) c_n: 最后一个time_step的cell状态结果,一般用不到 (num_layers * num_directions, batch, hidden_size) 如果lstm双向,那么h_n\c_n第一维加倍 如果lstm不双向则代表各层数对应的c_n/h_n end '''
-
基于LSTM的股票价格预测(多个输入数据特征、每次预测一个time_step)
内容是基于股票的开盘价、关盘价、最高价、最低价、售卖量来预测每天的开盘价。具体是根据每五天的开盘价、关盘价、最高价、最低价、售卖量,预测下一天的开盘价。滑动选取训练集(间隔为1),进行训练和预测。原文取自https://blog.csdn.net/qq_31267769/article/details/111561678?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control,现稍作改动。
这是一个多对一的问题,由5个时间序列的数据(特征数为5)预测下一个时间点的开盘价,对所有数据滑动预测下一个时间点的开盘价。其中模型部分,可以用h_n直接得出最后一个time_step的预测值,也可以用output取最后一个time_step的索引达到同样的目的。代码中特征数、时序长度都可以改动来满足自己的需要,如果想预测得到下面多个时序的值,那么对output需要多取一些值。label也要作相应的改动。
class GetData:
def __init__(self, stock_id, save_path):
self.stock_id = stock_id
self.save_path = save_path
self.data = None
def getData(self):
self.data = ts.get_hist_data(self.stock_id).iloc[::-1]
self.data = self.data[["open", "close", "high", "low", "volume"]]
self.open_min = self.data['open'].min()
self.open_max = self.data["open"].max()
self.data = self.data.apply(lambda x: (x - min(x)) / (max(x) - min(x)))
self.data.to_csv(self.save_path)
return self.data
def process_data(self, n):
if self.data is None:
self.getData()
feature = [
self.data.iloc[i: i + n].values.tolist()
for i in range(len(self.data) - n + 2)
if i + n < len(self.data)
]
label = [
self.data.open.values[i + n]
for i in range(len(self.data) - n + 2)
if i + n < len(self.data)
]
train_x = feature[:500]
test_x = feature[500:]
train_y = label[:500]
test_y = label[500:]
return train_x, test_x, train_y, test_y
class Model(nn.Module):
def __init__(self, n):
super(Model, self).__init__()
self.lstm_layer = nn.LSTM(input_size=n, hidden_size=128, batch_first=True)
self.linear_layer = nn.Linear(in_features=128, out_features=1, bias=True)
def forward(self, x):
out1, (h_n, h_c) = self.lstm_layer(x)
a, b, c = h_n.shape
# 或者用 h_n 输出最后一次time_step 20 X 1
# out2 = self.linear_layer(h_n.reshape(a*b, c))
a, b, c = out1.shape
# 取out的最后一个time_step 100 X 1
out2 = self.linear_layer(out1.reshape(a*b, c))
out2 = out2.view(a,b,-1) # 20 X 5 X 1
out2 = out2[:,-1,:] # 20 X 1
return out2
def train_model(epoch, train_dataLoader, test_dataLoader):
train_loss = 0
test_loss = 0
for _ in range(epoch):
total_train_loss = 0
total_train_num = 0
total_test_loss = 0
total_test_num = 0
for x, y in tqdm(train_dataLoader, desc='Epoch: {}| Train Loss: {}| Test Loss: {}'.format(_, train_loss, test_loss)):
x_num = len(x)
out = model(x)
loss = loss_func(out, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_train_loss += loss.item()
total_train_num += x_num
train_loss = total_train_loss / total_train_num
torch.save(model.state_dict(), 'E:/Tablefile/lstm_.pth')
def test_model(test_dataLoader_):
pred = []
label = []
model_ = Model(5)
model_.load_state_dict(torch.load('E:/Tablefile/lstm_.pth'))
model_.eval()
total_test_loss = 0
total_test_num = 0
for x, y in test_dataLoader_:
x_num = len(x)
out = model_(x)
print('##', out.shape, y.shape)
loss = loss_func(out, y)
total_test_loss += loss.item()
total_test_num += x_num
pred.extend(out.data.squeeze(1).tolist())
label.extend(y.tolist())
test_loss = total_test_loss / total_test_num
return pred, label, test_loss
def plot_img(data, pred):
plt.figure(figsize=(18, 9))
plt.plot(range(len(pred)), pred, color='green')
plt.plot(range(len(data)), data, color='b')
plt.xlabel('Date', fontsize=18)
plt.ylabel('Open', fontsize=18)
plt.legend(('predict', 'real'))
plt.show()
if __name__ == '__main__':
epoch = 10
days_num = 5 # 时序长度,可调整
feature = 5 # 变量特征数,可调整
batch_size = 20
# 数据读取
GD = GetData(stock_id='000963', save_path='E:/Tablefile/data.csv')
x_train, x_test, y_train, y_test = GD.process_data(days_num)
x_train = torch.tensor(x_train).float()
x_test = torch.tensor(x_test).float()
y_train = torch.tensor(y_train).float()
y_test = torch.tensor(y_test).float()
train_data = TensorDataset(x_train, y_train)
train_dataLoader = DataLoader(train_data, batch_size=batch_size)
test_data = TensorDataset(x_test, y_test)
test_dataLoader = DataLoader(test_data, batch_size=batch_size)
# 模型训练
model = Model(feature)
loss_func = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
train_model(epoch, train_dataLoader, test_dataLoader)
# 预测出图
pre, y, test_loss = test_model(test_dataLoader)
pred = [ele * (GD.open_max - GD.open_min) + GD.open_min for ele in pre] # 反归一化
data = [ele * (GD.open_max - GD.open_min) + GD.open_min for ele in y] # 反归一化
plot_img(data, pred)
后记
-
LSTM可以通过输入门、遗忘门、更新门实现 cell state 长时期记忆的更新,通过输出门实现短期的输出,从而避免了RNN在长序列预测中长出现的梯度消失/梯度爆炸现象。
-
可以*设置seq_len(时序长度)、input_size(特征数),输出1个time_step或者是多个time_step的值,有兴趣可以自行改动一下输出的time_step个数及其标签,自行测试。
-
本实战每次使用前5个time_step来预测下一个时刻的值,虽然预测出了较长时段的值,但是却完全依赖于前5个时间点的数据,即所有预测值都有实测值,该实验只有检验意义,而没有预报意义;可在通过前5个时间点预测出新值后,再将新值加入序列中进行循环预测,从而实现完全的预测,如有兴趣可以考虑不断加入预测值进行预报。
-
这里有LSTM的手撕代码,有兴趣可参考:https://blog.csdn.net/jining11/article/details/90675276
参考文献
- https://blog.csdn.net/qq_31267769/article/details/111561678?utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~default-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~default-8.control(实战)
- http://colah.github.io/posts/2015-08-Understanding-LSTMs/(LSTM理解)
- https://blog.csdn.net/qq_16949707/article/details/84845552(LSTM理解)
- https://www.cnblogs.com/marsggbo/p/12123755.html(LSTM输入/输出)