import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.autograd import Variable
#定义模型
'''
input_size – 输入的特征维度
hidden_size – 隐状态的特征维度
num_layers – 层数(和时序展开要区分开)
'''
class lstm_reg(nn.Module):
def __init__(self, input_size, hidden_size, output_size=5, num_layers=1):
super(lstm_reg, self).__init__()
self.rnn = nn.LSTM(input_size,
hidden_size, # rnn hidden unit
num_layers, # 有几层 RNN layers
batch_first=True, )
self.out = nn.Linear(hidden_size, output_size)
def forward(self,x):
r_out, (h_n, h_c) = self.rnn(x, None)
out = self.out(r_out[:, -1, :])
return out
def get_data(look_back,output_size):
df = pd.read_csv('../你的文件.csv',encoding='gbk')
df= df[['date','value']]
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(by=['date']).reset_index()[['date','value']]
df = df[['value']]#.values.astype('float32')
max_value = np.max(df['value'].values)
min_value = np.min(df['value'].values)
scalar = max_value - min_value
dataset = list(map(lambda x: (x-min_value)/scalar, df['value'].values))#数据先归一化
dataX, dataY = [], []
for i in range(len(dataset) - look_back - output_size):
a = dataset[i:(i + look_back)]
dataX.append(a)
dataY.append(dataset[(i + look_back):(i+look_back+output_size)])
dataX, dataY = np.array(dataX), np.array(dataY)
train_size = round(len(dataX)*0.7)-1
train_x,train_y,test_x,test_y = np.array(dataX[:train_size]),np.array(dataY[:train_size]),np.array(dataX[train_size:]),np.array(dataY[train_size:])
return train_x,train_y,test_x,test_y,min_value,scalar
def model(EPOCH = 10000,BATCH_SIZE = 20,time_step=100,
hidden_size=15,lambd = 0.0001,lr=0.0009,num_layers=10,output_size=5):
train_x,train_y,test_x,test_y,min_value,scalar = get_data(time_step,output_size)
train_x = train_x.reshape(-1, time_step,1)
train_y = train_y.reshape(-1,output_size)
test_x = test_x.reshape(-1, time_step,1)
test_y = test_y.reshape(-1,output_size)
train_x = torch.from_numpy(train_x)
train_y = torch.from_numpy(train_y)
test_x = torch.from_numpy(test_x)
net = lstm_reg(1, hidden_size,num_layers=num_layers,output_size=output_size)
criterion = nn.MSELoss()#均方误差
optimizer = torch.optim.Adam(net.parameters(),weight_decay=lambd,lr=lr)#Adam梯度下降法,学习率选择
#开始训练
for epoch in range(EPOCH):
for i in range(0,len(train_x),BATCH_SIZE):
var_x = train_x[i:i+BATCH_SIZE]
var_y = train_y[i:i+BATCH_SIZE]
out = net(var_x.float())#前向传播
loss = criterion(out, var_y.float())#误差
#输出测试集合的预测和误差
test_out = net(test_x.float())
test_loss = criterion(test_out,torch.from_numpy(test_y).float())
optimizer.zero_grad()#反向传播
loss.backward()
optimizer.step()
if epoch%10==0:
print('Epoch:{}, Loss:{:.5f} test_Loss:{:.5f}'.format(epoch, loss.item(),test_loss.item()))
#保存
torch.save(net,'./多层模型测试.tar')
#加载
#net = torch.load('./test1.tar')
#预测一下train_x
train_y_hat = net(train_x.float()).data.numpy().reshape(-1,output_size)
train_y_hat_restore = train_y_hat*scalar+min_value
train_y_restore = (train_y*scalar+min_value ).data.numpy().reshape(-1,output_size)
# print('train预测:',train_y_hat_restore)
# print('train实际:',train_y_restore)
test_y_out = net(test_x.float())
test_y_loss = criterion(test_y_out,torch.from_numpy(test_y).float())
print('验证误差:',test_y_loss)
test_y_hat = test_y_out.data.numpy().reshape(-1,output_size)
test_y_hat_restore = test_y_hat*scalar+min_value
test_y_restore = test_y.reshape(-1,output_size)*scalar+min_value
# print('test预测:',test_y_hat_restore)
# print('test实际:',test_y_restore)
minus =test_y_hat_restore-test_y_restore
#print('实际误差:',type(minus))
df = pd.DataFrame({'y':list(test_y_restore.reshape(-1))
,'yhat':list(test_y_hat_restore.reshape(-1))
,'minus':list(minus.reshape(-1))})
print(df)
print('验证后真实数据mse:',np.mean(minus**2))
df.to_csv('./rnn的测试集结果4.csv')
# plt.plot(train_y_hat_restore, 'g', label='train_prediction')
# plt.plot(train_y_restore, 'b', label='train_real')
# plt.plot(test_y_hat_restore, 'r', label='test_prediction')
# plt.plot(test_y_restore, 'tan', label='test_real')
# plt.plot(minus, 'dimgray', label='minus')
# plt.legend(loc='best')
# plt.show()
if __name__ == "__main__":
model()