时间序列
时间序列预测,可以对小样本预测。使用时间序列预测,数据必须要满足平稳性要求。
平稳性: 要使用时间序列预测数据,数据需要满足稳定性要求。一般要求数据的均值和方差不发生明显变化。
严平稳:高斯白噪声即高斯分布,也就是标准的正态分布。它的均值和方差不发生变化。一般在真实的数据集上,很难达到。通常采用的是宽平稳。
宽平稳:数据的期望与相关系数,不发生明显变化。例如,要预测今天的下雨量,那么它依赖于昨天的下雨量,而昨天的下雨量依赖于前天的下雨量。这期间的相关系数不应变化很大。
为了使数据满足平稳性要求,可以使用差分运算。
时间序列股票预测
from random import random
import warnings
from tokenize import PlainToken
import pandas as pd
import numpy as np
# import matplotlib.pylab as plt
import matplotlib.pyplot as plt
import statsmodels.tsa.stattools as ts
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf,plot_pacf
from statsmodels.tsa.arima_model import ARIMA
import seaborn as sns
warnings.filterwarnings('ignore', 'statsmodels.tsa.arima_model.ARMA',FutureWarning)
def get_data(num): #num表示数据数量 生成时间序列,用于自己生成用例
return pd.Series(np.random.randn(num), index = pd.date_range('2022-02-04', periods=20, freq='D'))
def cal_diff(data): #计算差分,用于list和array等其他格式
diff = []
for i in range(len(data) - 1):
diff.append(data[i + 1] - data[i])
return diff
def Acf_Pacf(data): #确定p和q的阶数,data是差分后的数据
acf = plot_acf(data,lags=20)
plt.title('ACF')
acf.show()
pacf = plot_pacf(data, lags=20)
plt.title("PACF")
pacf.show()
plt.figure()
plt.plot(stock_train)
plt.title("raw-data")
plt.show()
if __name__ == "__main__":
stock = pd.read_excel(file,index_col=0)
stock_train = stock[1:50]
stock_diff = stock_train.diff()
stock_diff = stock_diff.dropna() #去除nan值
Acf_Pacf(stock_diff)
model_pre = ARIMA(stock_train, order=(1, 1, 2)) # order(p,d,q)
model = model_pre.fit()
result = model.predict(48,70,dynamic=True,typ = 'levels')
print(result)
LSTM
LSTM股票预测
import os
import pandas_datareader.data as web
import datetime
from sklearn.preprocessing import StandardScaler
from collections import deque
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import Sequential,load_model
from tensorflow.keras.layers import LSTM,Dense,Dropout
import matplotlib.pyplot as plt
mem_his_days = 5
pre = 10
feature = 5
input_shape = (mem_his_days, feature) #(记忆天数,特征数)
output_shape = 1 #输出
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
start = datetime.datetime(2000,1,1)
end = datetime.datetime(2021,9,1)
df = web.DataReader('GOOGL','stooq',start,end)
def get_data(df,pre,mem_his_days): # pre要预测多少天
df.dropna(inplace = True)
df.sort_index(inplace = True)
pre_days = pre
df['label'] = df['Close'].shift(-pre_days)
scaler = StandardScaler()
X = scaler.fit_transform(df.iloc[:,:-1])
y = df['label'].values[mem_his_days -1 : -pre_days]
return X , y
def get_feature(sca_X):
deq = deque(maxlen = mem_his_days)
X = []
for i in sca_X:
deq.append(list(i))
if len(deq) == mem_his_days :
X.append(list(deq))
X_lately = X[-pre:]
X = X[:-pre]
return X, X_lately
def Split(X,y,size):
X_train, X_test, y_train, y_test = train_test_split(X, y,shuffle=False,test_size = size)
return X_train, X_test, y_train, y_test
def Model(lstm_layers,dense_layers,units):
model = Sequential()
model.add(LSTM(units,input_shape = input_shape,activation = 'relu',return_sequences=True))
model.add(Dropout(0.1))
for i in range(lstm_layers):
model.add(LSTM(units,activation = 'relu',return_sequences=True))
model.add(Dropout(0.1))
model.add(LSTM(units,activation = 'relu'))
model.add(Dropout(0.1))
for i in range(dense_layers):
model.add(Dense(units,activation = 'relu'))
model.add(Dropout(0.1))
model.add(Dense(output_shape))
model.compile(optimizer = 'adam',loss = 'mse', metrics = ['mape'])
return model
def best_model():
mem_days = [5] # [5,10,15]
lstm_lays = [1] # [1,2,3]
dense_lays = [1] # [1,2,3]
units = [32] # [16,32]
for the_mem_day in mem_days:
for the_lstm_lay in lstm_lays:
for the_dense_lay in dense_lays:
for the_unit in units:
filepath = '{val_mape:.2f}'+ f'mem_{the_mem_day}_lstm_{the_lstm_lay}_dense_{the_dense_lay}_unit_{the_unit}'
checkpoint = ModelCheckpoint(
filepath = filepath,
save_weights_only = False,
monitor = 'val_mape',
mode = 'min',
save_best_only = True
)
sca_X , y = get_data(df,pre,mem_his_days)
X, X_lately = get_feature(sca_X)
X = np.array(X)
y = np.array(y)
X_train, X_test, y_train, y_test = Split(X,y,size = 0.1)
model = Model(the_lstm_lay,the_dense_lay,the_unit)
model.fit(X_train,y_train,batch_size = 32,epochs = 60,validation_data = (X_test,y_test),callbacks=[checkpoint])
if __name__ == "__main__":
# best_model()
# best.summary()
sca_X , y = get_data(df,pre,mem_his_days)
X, X_lately = get_feature(sca_X)
X = np.array(X)
y = np.array(y)
X_train, X_test, y_train, y_test = Split(X,y,size = 0.1)
df_time = df.index[-len(y_test):]
best = load_model('C:/Users/ASUS/model')
pret = best.predict(X_test)
plt.plot(df_time,y_test,color = 'red')
plt.plot(df_time,pret,color = 'green')
plt.show()
# model = Model()
# model.fit(X_train,y_train,batch_size = 32,epochs = 60,validation_data = (X_test,y_test))