基于tensorflow2.x版本python代码实现深度学习回归预测(以lstm为例)
代码实现(能直接跑通本文中的代码)
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from general import *
#import tensorflow as tf
#from tensorflow import keras
from keras.models import Sequential
from keras.layers import *
#from general import *
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn import datasets
from sklearn.cross_validation import train_test_split
#生成回归训练集
def generate_regression_train_data():
dataset_path = keras.utils.get_file("auto-mpg.data", "http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data")
dataset_path
column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight',
'Acceleration', 'Model Year', 'Origin']
raw_dataset = pd.read_csv(dataset_path, names=column_names,
na_values = "?", comment='\t',
sep=" ", skipinitialspace=True)
dataset = raw_dataset.copy()
dataset = dataset.dropna()
origin = dataset.pop('Origin')
dataset['USA'] = (origin == 1)*1.0
dataset['Europe'] = (origin == 2)*1.0
dataset['Japan'] = (origin == 3)*1.0
train_dataset = dataset.sample(frac=0.8,random_state=0)
test_dataset = dataset.drop(train_dataset.index)
train_stats = train_dataset.describe()
train_stats.pop("MPG")
train_stats = train_stats.transpose()
y_train = train_dataset.pop('MPG')
y_test = test_dataset.pop('MPG')
def norm(x):
return (x - train_stats['mean']) / train_stats['std']
X_train = norm(train_dataset)
X_test = norm(test_dataset)
x_train = np.array(X_train)
x_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)
return x_train, y_train, x_test, y_test
#Func构建方式
class FuncMRegressor(keras.Model):
def __init__(self, units, num_classes, num_layers):
super(FuncMRegressor, self).__init__()
# self.cells = [keras.layers.LSTMCell(units) for _ in range(num_layers)]
#
# self.rnn = keras.layers.RNN(self.cells, unroll=True)
self.rnn = keras.layers.LSTM(units, return_sequences=True)
# return_sequences设置lstm中单个细胞中返回时间点的类型。非最后一层,return_sequences=True才能满足三维输入的数据格式要求
#默认 False。在输出序列中,返回单个 hidden state值还是返回全部time step 的 hidden state值。 False 返回单个, true 返回全部。
self.rnn2 = keras.layers.LSTM(units) #最后一层,return_sequences=False,对于单个样本而言,返回的是一维(对于所有样本是二维)
# self.fc1 = layers.Dense(64, activation='relu')
# self.fc2 = layers.Dense(64, activation='relu')
# self.cells = (keras.layers.LSTMCell(units) for _ in range(num_layers))
#
# self.rnn = keras.layers.RNN(self.cells, return_sequences=True, return_state=True)
# self.rnn = keras.layers.LSTM(units, unroll=True)
# self.rnn = keras.layers.StackedRNNCells(self.cells)
# have 1000 words totally, every word will be embedding into 100 length vector
# the max sentence lenght is 80 words
# self.embedding = keras.layers.Embedding(1000, 100, input_length=9)
#指定隐含层的shape-->[top_words,100],input_length参数是最初输入的二维矩阵的维度。
# 这样,根据隐含层原理,隐含层输出的数据shape为-->[None,input_length,100]=[None,80,100]
# self.soft_max = keras.layers.Softmax()
# self.fc1 = keras.layers.Dense(32, activation='relu')
self.fc3 = keras.layers.Dense(1) #最后一层全连接层。对于N分类问题,最后一层全连接输出个数为N个;对于回归问题,最后一层全连接层的输出为1
#定义输入输出
def call(self, inputs, training=None, mask=None): ###*** 该函数重写了RNN的call()方法,该方法是模型输出表达式(即,模型)
#print('y', inputs.shape)
#1 利用隐含层将二维数组变为三维数组
# [None,80]-->[None,80,100]
# [b, sentence len] => [b, sentence len, word embedding]
# y = self.embedding(inputs) #用隐含层的目的是让本来二维的数据变成三维输入到lstm中(大多数的深度学习算法都要求输入的数据是三维,对于传统的如房价预测,需要将二维数组通过一定方法转换为3维数组)
# print("y1 shape:",y.shape)
#2 得到了满足lstm(大多数深度学习算法)输入的三维数据格式后,搭建lstm网络
#搭建神经网络:可以根据需要搭建输入、多层结构层、dropout、激活函数、池化等
y = self.rnn(inputs) #要求输入到lstm模型中的数据必须是三维
#print("y2 shape:", y.shape)
y = self.rnn2(y)
# print('rnn', x.shape)
#3 确定输出。
#y = self.fc1(y) #全连接层
#print(y.shape)
#y = self.fc2(y)
y = self.fc3(y)
return y
#Seque构建方式(推荐)
class SequeRegressor():
def __init__(self, units):
self.units = units
self.model = None
#构建神经网络模型:(根据各层输入输出的shape)搭建网络结构、确定损失函数、确定优化器
def build_model(self, loss, optimizer, metrics):
self.model = Sequential()
self.model.add(LSTM(self.units, return_sequences=True))
self.model.add(LSTM(self.units))
self.model.add(Dense(1))
self.model.compile(loss=loss,
optimizer=optimizer, #优化器的选择很重要
metrics=metrics)
if __name__ == "__main__":
print("**********************【Func搭建方式】********************")
# 获取二维数组(特征提取过的)
x_train, y_train, x_test, y_test = generate_regression_train_data()
# 把二维数组改为三维数组(通过np.newaxis随机增加一维数组)
x_train = x_train[:, :, np.newaxis]
x_test = x_test[:, :, np.newaxis]
#训练模型
units = 64 #细胞个数
num_classes = 1 #回归问题,该参数为1
batch_size = 32
epochs = 35
model = FuncMRegressor(units, num_classes, num_layers=2)
optimizer = tf.keras.optimizers.RMSprop(0.001)
model.compile(loss='mse',
optimizer=optimizer,
metrics=['mae', 'mse'])
# model.compile(optimizer=keras.optimizers.Adam(0.001),
# loss=keras.losses.BinaryCrossentropy(from_logits=False),
# metrics=['accuracy', 'mse'])
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,
validation_data=(x_test, y_test), verbose=1)
# 模型应用预测
#model.predict只返回y_pred
out = model.predict(x_train)
#print("out:", out)
#evaluate用于评估您训练的模型。它的输出是准确度或损失,而不是对输入数据的预测。
scores = model.evaluate(x_test, y_test, batch_size, verbose=1)
print("Final test loss and accuracy :", scores)
print("**********************【Seque搭建方式】********************")
#1 获取训练数据集,并调整为三维输入格式
x_train, y_train, x_test, y_test = generate_regression_train_data()
x_train = x_train[:, :, np.newaxis]
x_test = x_test[:, :, np.newaxis]
#2 构建神经网络模型:(根据各层输入输出的shape)搭建网络结构、确定损失函数、确定优化器
units = 64 #lstm细胞个数
loss = "mse" #损失函数类型
optimizer = tf.keras.optimizers.RMSprop(0.001) #优化器类型
metrics = ['mae', 'mse'] #评估方法类型
srlstm = SequeRegressor(units)
srlstm.build_model(loss, optimizer, metrics)
#3 训练模型
epochs = 35
batch_size = 32
srlstm.model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size)
#4模型评估
score = srlstm.model.evaluate(x_test, y_test, batch_size=16)
print("model score:", score)
#5 模型应用:预测
prediction = srlstm.model.predict(x_test)