本实验使用环境为Anaconda3 Jupyter,调用Sklearn包,调用keras包,请提前准备好。
1.引入一些常见包
主要有keras包、numpy包、metrics包、pandas包等。
import csv
import numpy as np
import time
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import explained_variance_score
from sklearn import metrics
from sklearn.svm import SVR
import matplotlib.pyplot as plt
from pandas import DataFrame
from pandas import Series
from pandas import concat
from pandas import read_csv
from pandas import datetime
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from math import sqrt
from matplotlib import pyplot
import numpy
2.引入数据
其中data为全部数据、traffic_feature为特征集、traffic_target目标集
data=[]
traffic_feature=[]
traffic_target=[]
csv_file = csv.reader(open('GoodData.csv'))
for content in csv_file:
content=list(map(float,content))
if len(content)!=0:
data.append(content)
traffic_feature.append(content[0:4])
traffic_target.append(content[-1])
traffic_feature=np.array(traffic_feature)
traffic_target=np.array(traffic_target)
data=np.array(data)
对数据进行分割,本文使用70%为分割点。
feature_train=traffic_feature[0:int(len(traffic_feature)*0.7)]
feature_test=traffic_feature[int(len(traffic_feature)*0.7):int(len(traffic_feature))]
target_train=traffic_target[0:int(len(traffic_target)*0.7)]
target_test =traffic_target[int(len(traffic_target)*0.7):int(len(traffic_target))]
对后30%的目标值继续分割,分割点仍然为70%,预留。
target_test_last=target_test[int(len(target_test)*0.7):int(len(target_test))]
3.数据标准化
使用StandardScaler()方法将特征数据标准化归一化。
scaler = StandardScaler() # 标准化转换
scaler.fit(traffic_feature) # 训练标准化对象
traffic_feature= scaler.transform(traffic_feature) # 转换数据集
4.使用ELM算法预测
class HiddenLayer:
def __init__(self, x, num): # x:输入矩阵 num:隐含层神经元个数
row = x.shape[0]
columns = x.shape[1]
rnd = np.random.RandomState(9999)
self.w = rnd.uniform(-1, 1, (columns, num)) #
self.b = np.zeros([row, num], dtype=float) # 随机设定隐含层神经元阈值,即bi的值
for i in range(num):
rand_b = rnd.uniform(-0.4, 0.4) # 随机产生-0.4 到 0.4 之间的数
for j in range(row):
self.b[j, i] = rand_b # 设定输入层与隐含层的连接权值
self.h = self.sigmoid(np.dot(x, self.w) + self.b) # 计算隐含层输出矩阵H
self.H_ = np.linalg.pinv(self.h) # 获取H的逆矩阵
# print(self.H_.shape)
# 定义激活函数g(x) ,需为无限可微函数
def sigmoid(self, x):
print(x)
return 1.0 / (1 + np.exp(-x))
''' 若进行分析的训练数据为回归问题,则使用该方式 ,计算隐含层输出权值,即β '''
def regressor_train(self, T):
C = 2
I = len(T)
sub_former = np.dot(np.transpose(self.h), self.h) + I / C
all_m = np.dot(np.linalg.pinv(sub_former), np.transpose(self.h))
T = T.reshape(-1, 1)
self.beta = np.dot(all_m, T)
return self.beta
"""
计算隐含层输出权值,即β
"""
def classifisor_train(self, T):
en_one = OneHotEncoder()
# print(T)
T = en_one.fit_transform(T.reshape(-1, 1)).toarray() # 独热编码之后一定要用toarray()转换成正常的数组
# print(T)
C = 3
I = len(T)
sub_former = np.dot(np.transpose(self.h), self.h) + I / C
all_m = np.dot(np.linalg.pinv(sub_former), np.transpose(self.h))
self.beta = np.dot(all_m, T)
return self.beta
def regressor_test(self, test_x):
b_row = test_x.shape[0]
h = self.sigmoid(np.dot(test_x, self.w) + self.b[:b_row, :])
result = np.dot(h, self.beta)
return result
def classifisor_test(self, test_x):
b_row = test_x.shape[0]
h = self.sigmoid(np.dot(test_x, self.w) + self.b[:b_row, :])
result = np.dot(h, self.beta)
result = [item.tolist().index(max(item.tolist())) for item in result]
return result
跑起来~
设置神经元个数为8,可以自行调优。
import matplotlib.pyplot as plt
from sklearn.metrics import explained_variance_score
a = HiddenLayer(feature_train,8)
a.regressor_train(target_train)
result = a.regressor_test(feature_test)
plt.plot(result)#测试数组
plt.plot(target_test)#测试数组
plt.legend(['ELM','TRUE'])
fig = plt.gcf()
fig.set_size_inches(18.5, 10.5)
plt.title("ELM") # 标题
plt.show()
print("EVS:",explained_variance_score(target_test,result))
结果如下:
EVS等于0.8705
5.使用真实值减预测值,获的ELM的残差
a=[]#真实值
for i in target_test:
a.append(i)
b=[]#预测值
for i in result:
b.append(i[0])
c=[]#残差值
num=[]
for inx,i in enumerate(a):
c.append(b[inx]-i)
num.append(inx)
plt.plot(c)#残差
fig = plt.gcf()
fig.set_size_inches(18.5,5)
plt.xlim(0,1560)
plt.title("Residual Signal") # 标题
plt.show()
残差值变化如下:
将残差的后30%截取,预留
result_last=b[int(len(b)*0.7):int(len(b))]
6.对残差值使用长短记忆神经网络(LSTM)预测
使用残差值的前70%作为测试集,使用后30%作为验证集。
train, test =c[0:int(len(c)*0.7)], c[int(len(c)*0.7):int(len(c))]
def timeseries_to_supervised(data, lag=1):
df = DataFrame(data)
columns = [df.shift(i) for i in range(1, lag+1)]
columns.append(df)
df = concat(columns, axis=1)
df.fillna(0, inplace=True)
return df
def fit_lstm(train, batch_size, nb_epoch, neurons):
X, y = train[:, 0:-1], train[:, -1]
X = X.reshape(X.shape[0], 1, X.shape[1])
model = Sequential()
model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
for i in range(nb_epoch):
model.fit(X, y, epochs=1, batch_size=batch_size, verbose=0, shuffle=False)
model.reset_states()
return model
# make a one-step forecast
def forecast_lstm(model, batch_size, X):
X = X.reshape(1, 1, len(X))
yhat = model.predict(X, batch_size=batch_size)
return yhat[0,0]
c2d=[]
for i in c:
c2d.append([i,i])
scaler = StandardScaler() # 标准化转换
scaler.fit(c2d) # 训练标准化对象
supervised= scaler.transform(c2d) # 转换数据集
c1d=[]
for j in supervised:
c1d.append(j[0])
supervised = timeseries_to_supervised(c1d, 1)
train_scaled, test_scaled =supervised[0:int(len(supervised)*0.7)], supervised[int(len(supervised)*0.7):int(len(supervised))]
train_scaled=np.array(train_scaled)
test_scaled=np.array(test_scaled)
print("开始")
# fit the model
lstm_model = fit_lstm(train_scaled, 1, 30, 4)
# forecast the entire training dataset to build up state for forecasting
train_reshaped = train_scaled[:, 0].reshape(len(train_scaled), 1, 1)
lstm_model.predict(train_reshaped, batch_size=1)
# walk-forward validation on the test data
predictions = list()
for i in range(len(test_scaled)):
# make one-step forecast
X, y = test_scaled[i, 0:-1], test_scaled[i, -1]
yhat = forecast_lstm(lstm_model, 1, X)
# store forecast
predictions.append(yhat)
print("结束")
经过数据格式改变后,残差预测效果如下。
predictions2d=[]
for i in predictions:
predictions2d.append([i,i])
predictions2d
predictions2d=scaler.inverse_transform(predictions2d)
predictions1d=[]
for j in predictions2d:
predictions1d.append(j[0])
# report performance
rmse = sqrt(mean_squared_error(test, predictions1d))
print('RMSE: %.3f' % rmse)
# line plot of observed vs predicted
fig = pyplot.gcf()
fig.set_size_inches(18.5, 10.5)
pyplot.plot(test)
pyplot.plot(predictions1d)
pyplot.show()
7.ELM与ELM-LSTM预测结果进行对比
ELM:
print("mse:",metrics.mean_squared_error(target_test_last,result_last))
print("R2:",metrics.r2_score(target_test_last,result_last))
ELM-LSTM:
test1=np.array(test)
predictions1d1=np.array(predictions1d)
result1=result_last-test1+predictions1d1
print("mse:",metrics.mean_squared_error(target_test_last,result1))
print("R2:",metrics.r2_score(target_test_last,result1))
COMPARE:
x=range(1089,1556,1)
plt.plot(x,target_test_last,marker=',')
plt.plot(x,result_last,marker='+')
plt.plot(x,result1,marker='x')
plt.legend(['True','ELM','ELM_LSTM'])
fig = plt.gcf()
fig.set_size_inches(18.5, 10.5)
plt.title("COMPARE") # 标题
plt.show()
结果如下:
由上可知ELM-LSTM比ELM的R方要高、MSE要低。且下图显示ELM-LSTM比ELM更贴近真实值。
提供思路