本项目利用深度强化学习中的A3C算法提取某支股票的历史数据特征,然后预测未来15天的收盘价格走势。
注:
1)本项目使用tensorflow1.14版本。
2)投资有风险,理财需谨慎。
3)本人选择某股训练结果如下,通过实践表明,在市场环境相对稳定的情况下,本代码能够正确预测未来几天内的升降情况。但是单价数据会有些出入。
1.数据导入
需要从财经类网站自行下载自己意向的股票历史数据,将数据导入py文件中的python代码路径指向保存好的股票历史路径。通过训练1000次学习后预算未来15天的收盘价走势。
"""
Created on Sun Jan 2 09:48:13 2022
@author: USER
"""
"""
self.x 存储收盘价,剔除数据为零的无效数据
self.x中读取的csv文件数据顺序和scv中顺序一样,注意数据时间的排序
注意训练的数据,不要用其他的数据训练并预测,同时注意读取的收盘价的数据,要修改代码读取csv对应的列数据
"""
import matplotlib.pyplot as plt
import csv
class Env():
def __init__(self):
self.x=self.read_data()
#定义连续动作的范围
self.action_up=100
self.action_down=0
#已经观测的个数
self.observation_lenth=100
#每次观测的数据个数
self.length_obs=100
self.obs_step=1 #每次移动一个为步长
self.observation_space_features=100
#记录最初的数据长度,预算未来长度
self.data_lenth=0
self.predict_lenth=15
#预测数据
self.predict_list=[]
def read_data(self):
path = 'D:\Simulation\优秀项目\Stock\Stock_Predict - 14.0\\600420.csv'
x=[]
with open(path, 'r') as f:
reader = csv.reader(f)
for i in reader:
x.append(i[3]) #读取第三列元素,type是str类型
del(x[0]) #删除list中第一个元素,因为它是文字,不能转化为float
#将x中的元素str转化为float类型
y=[]
for i in x:
#去掉数据中为0的元素,因为这种数据是错误的
k=float(i)
if k!=0:
y.append(k)
"""逆向排列y中元素,把近期的放在后面,以前放在前面,显示检查print"""
y=y[::-1]
#print("数据检查,近三天数据为:",y[-3:])
#计算最初数据长度
self.data_lenth=len(y)
return y
def step(self,action):
self.observation_lenth+=self.obs_step
if self.data_lenth<=self.observation_lenth: #这一步刚好等于数据+预测长度,那么做完了
done=True
else:
done=False
observation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]
reward=10/(abs(observation_[-1]-action[0])+1)
#print("预测值:%f " %(action),"实际值:%f"%(observation_[-1]),reward)
return observation_, reward, done
def predic_step(self,action):
self.observation_lenth+=self.obs_step
if self.data_lenth==self.observation_lenth:
print("原始数据值后三个值为:",self.x[-3:])
#超出数据长度后附加预测值
if self.data_lenth<self.observation_lenth:
self.x.append(action)
self.predict_list.append(action[0][0])
#这一步刚好等于数据+预测长度,那么做完了
if (self.data_lenth+self.predict_lenth)<=self.observation_lenth:
done=True
print("预测值为:",self.predict_list)
plt.plot(list(range(len(self.predict_list))),self.predict_list)
plt.xlabel("day")
plt.ylabel("price")
plt.show()
else:
done=False
observation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]
#取action的一个数据与观测到的下一天的数据做差,差越小奖励越大,绝对值倒数的方法表示,10控制奖励过大
reward=10/(abs(self.x[self.length_obs]-action[0])+1)
return observation_, reward, done
def reset(self):
self.length_obs=100
self.observation_lenth=100
#重新读取没预测的数据
self.x=self.read_data()
observation=self.x[0:self.observation_lenth]
return observation
2.主代码部分
import tensorflow as tf
import numpy as np
from date_env import Env
import matplotlib.pyplot as plt
RENDER = False
"""
1.根据price,设置合适的a_mu和a_sigma,因为输出激活函数时tanh只在-1到1之间
2.输出shape只读取了一行csv中的一行数据,如果读取多行需要对环境和神经网络结构做调整
"""
class Sample():
def __init__(self,env, policy_net):
self.env = env
self.policy_net=policy_net
self.gamma = 0.90
def sample_step(self,observation):
obs_next = []
obs = []
actions = []
r = []
state = np.reshape(observation, [-1, 100])
action = self.policy_net.choose_action(state)
observation_, reward, done = self.env.step(action)
# 存储当前观测
obs.append(np.reshape(observation, [-1, 100]))
# 存储后继观测
obs_next.append(np.reshape(observation_, [-1, 100]))
actions.append(action)
# 存储立即回报
r.append(reward)
# reshape 观测和回报
obs = np.reshape(obs, [len(obs), self.policy_net.n_features])
obs_next = np.reshape(obs_next, [len(obs_next), self.policy_net.n_features])
actions = np.reshape(actions, [len(actions),1])
r = np.reshape(r, [len(r),1])
"""观察区"""
#print(action[0],observation_[-1])
return obs, obs_next, actions, r, done,reward
#定义策略网络
class Policy_Net():
def __init__(self, env, action_bound, lr = 0.0001, model_file=None):
self.learning_rate = lr
#输入特征的维数
self.n_features = env.observation_space_features
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print(self.n_features)
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
print("----------------------------------")
#输出动作空间的维数
self.n_actions = 1
#1.1 输入层
self.obs = tf.placeholder(tf.float32, shape=[None, self.n_features])
#1.2.策略网络第一层隐含层
self.a_f1 = tf.layers.dense(inputs=self.obs, units=2000, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
bias_initializer=tf.constant_initializer(0.1))
#1.3 第二层,均值
a_mu = tf.layers.dense(inputs=self.a_f1, units=self.n_actions, activation=tf.nn.tanh, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
bias_initializer=tf.constant_initializer(0.1))
#1.3 第二层,标准差
a_sigma = tf.layers.dense(inputs=self.a_f1, units=self.n_actions, activation=tf.nn.softplus, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
bias_initializer=tf.constant_initializer(0.1))
"""注意神经网络最后是用的tanh输出的,值大小只在-1到1之间,因此,下面要进行值范围的扩大"""
self.a_mu = 15*a_mu
self.a_sigma =a_sigma+0.001
"""定义带参数的正态分布,a_mu和a_sigma必须是float类型"""
self.normal_dist = tf.contrib.distributions.Normal(self.a_mu, self.a_sigma)
#根据正态分布采样一个动作
"""在正分布中随机sample一个点,tf.clip_by_value限制大小范围,小于边界就取边界,大于就取边界"""
self.action = tf.clip_by_value(self.normal_dist.sample(1), action_bound[0],action_bound[1])
#1.5 当前动作,输入为当前动作,delta,
self.current_act = tf.placeholder(tf.float32, [None,1])
self.delta = tf.placeholder(tf.float32, [None,1])
#2. 构建损失函数
"""normal_dist.log_prob(action)表示action在正态分布中的概率"""
log_prob = self.normal_dist.log_prob(self.current_act)
self.a_loss = tf.reduce_mean(log_prob*self.delta+0.01*self.normal_dist.entropy())
# self.loss += 0.01*self.normal_dist.entropy()
#3. 定义一个动作优化器
self.a_train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(-self.a_loss)
"4.定义critic网络"
self.c_f1 = tf.layers.dense(inputs=self.obs, units=2000, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
bias_initializer=tf.constant_initializer(0.1))
self.v = tf.layers.dense(inputs=self.c_f1, units=1, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
bias_initializer=tf.constant_initializer(0.1))
#定义critic网络的损失函数,输入为td目标
self.td_target = tf.placeholder(tf.float32, [None,1])
self.c_loss = tf.square(self.td_target-self.v)
self.c_train_op = tf.train.AdamOptimizer(0.0002).minimize(self.c_loss)
#5. tf工程
self.sess = tf.Session()
#6. 初始化图中的变量
self.sess.run(tf.global_variables_initializer())
#7.定义保存和恢复模型
self.saver = tf.train.Saver()
if model_file is not None:
self.restore_model(model_file)
#依概率选择动作
def choose_action(self, state):
action = self.sess.run(self.action, {self.obs:state})
return action[0]
#定义训练
def train_step(self, state, state_next, label, reward):
#构建delta数据
gamma = 0.90
# print("reward",reward)
td_target = reward + gamma*self.sess.run(self.v, feed_dict={self.obs:state_next})[0]
# print("td_target",td_target)
delta = td_target - self.sess.run(self.v, feed_dict={self.obs:state})
c_loss, _ = self.sess.run([self.c_loss, self.c_train_op],feed_dict={self.obs: state, self.td_target: td_target})
a_loss, _ =self.sess.run([self.a_loss, self.a_train_op], feed_dict={self.obs:state, self.current_act:label, self.delta:delta})
return a_loss, c_loss
#定义存储模型函数
def save_model(self, model_path,global_step):
self.saver.save(self.sess, model_path,global_step=global_step)
#定义恢复模型函数
def restore_model(self, model_path):
self.saver.restore(self.sess, model_path)
def policy_train(env, brain, training_num):
reward_sum_line = []
training_time = []
average_reward = 0
for i in range(training_num):
observation = env.reset()
total_reward = 0
while True:
sample = Sample(env,brain)
#采样数据
current_state,next_state, current_action, current_r,done,c_r= sample.sample_step(observation)
# print(current_r)
total_reward += c_r
#训练AC网络
a_loss,c_loss = brain.train_step(current_state,next_state, current_action,current_r)
if done:
break
observation = next_state
if i == 0:
average_reward = total_reward
else:
average_reward = 0.95*average_reward + 0.05*total_reward
reward_sum_line.append(average_reward)
training_time.append(i)
#print("number of episodes:%d, current average reward is %f"%(i,average_reward))
print("第:%d次 Actor的损失值:%f Critic的损失值:%f"%(i,a_loss,c_loss))
"""======每训练20次进行保存网络,第一次不存储----"""
if i>0 and i%20==0:
brain.save_model('net/',i)
if a_loss<0.001 and c_loss<0.001:
print("预测值足够小,满足要求")
break
plt.plot(training_time, reward_sum_line)
plt.xlabel("training number")
plt.ylabel("score")
plt.show()
def policy_test(env, policy,RENDER):
observation = env.reset()
reward_sum = 0
reward_list=[]
while True:
# 根据策略网络产生一个动作
state = np.reshape(observation, [-1, 100])
action = policy.choose_action(state)
observation_, reward, done = env.predic_step(action)
# print(reward)
reward_list.append(reward[0])
reward_sum+=reward[0]
if done:
break
observation = observation_
#print(reward_sum)
#plt.plot(list(range(len(reward_list))),reward_list)
#plt.xlabel("training number")
#plt.ylabel("score")
#plt.show()
return reward_list
if __name__=='__main__':
#创建环境
env = Env()
action_bound = [env.action_down,env.action_up]
#实例化策略网络,model_file='D:\\Simulation\\优秀项目\\股票交易\\Stock_Predict\\net\\-99'
brain = Policy_Net(env,action_bound,model_file='D:\\Simulation\\优秀项目\\Stock\\Stock_Predict - 14.0\\net\\-80')
#训练时间
training_num = 1000
"""===策略训练==="""
#policy_train(env, brain, training_num)
#测试训练好的策略
"""===预测===注意训练时,brain中加入model_file,指向模型路径"""
reward_sum = policy_test(env, brain,True)