深度强化学习算法(A3C)预测未来股票走势

本项目利用深度强化学习中的A3C算法提取某支股票的历史数据特征,然后预测未来15天的收盘价格走势。
注:
1)本项目使用tensorflow1.14版本。
2)投资有风险,理财需谨慎。
3)本人选择某股训练结果如下,通过实践表明,在市场环境相对稳定的情况下,本代码能够正确预测未来几天内的升降情况。但是单价数据会有些出入。
深度强化学习算法(A3C)预测未来股票走势

1.数据导入
需要从财经类网站自行下载自己意向的股票历史数据,将数据导入py文件中的python代码路径指向保存好的股票历史路径。通过训练1000次学习后预算未来15天的收盘价走势。

"""
Created on Sun Jan  2 09:48:13 2022

@author: USER
"""
"""
self.x 存储收盘价,剔除数据为零的无效数据
self.x中读取的csv文件数据顺序和scv中顺序一样,注意数据时间的排序
注意训练的数据,不要用其他的数据训练并预测,同时注意读取的收盘价的数据,要修改代码读取csv对应的列数据
"""
import matplotlib.pyplot as plt
import csv
class Env():
    def __init__(self):
        self.x=self.read_data()
        #定义连续动作的范围
        self.action_up=100
        self.action_down=0
        #已经观测的个数
        self.observation_lenth=100 
        #每次观测的数据个数
        self.length_obs=100
        self.obs_step=1 #每次移动一个为步长
        self.observation_space_features=100
        #记录最初的数据长度,预算未来长度
        self.data_lenth=0
        self.predict_lenth=15
        #预测数据
        self.predict_list=[]
    def read_data(self):
        path = 'D:\Simulation\优秀项目\Stock\Stock_Predict - 14.0\\600420.csv'
        x=[]
        with open(path, 'r') as f:
            reader = csv.reader(f)
            for i in reader:
                 x.append(i[3]) #读取第三列元素,type是str类型
        del(x[0])  #删除list中第一个元素,因为它是文字,不能转化为float
        #将x中的元素str转化为float类型
        y=[]
        for i in x:
            #去掉数据中为0的元素,因为这种数据是错误的
            k=float(i)
            if k!=0:  
                y.append(k)
        """逆向排列y中元素,把近期的放在后面,以前放在前面,显示检查print"""
        y=y[::-1]
        #print("数据检查,近三天数据为:",y[-3:])
        #计算最初数据长度
        self.data_lenth=len(y)
        return y
    def step(self,action):
        self.observation_lenth+=self.obs_step
        if self.data_lenth<=self.observation_lenth: #这一步刚好等于数据+预测长度,那么做完了
            done=True
        else:
            done=False
        observation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]
        reward=10/(abs(observation_[-1]-action[0])+1)
        #print("预测值:%f " %(action),"实际值:%f"%(observation_[-1]),reward)
        return observation_, reward, done
    
    def predic_step(self,action):
        self.observation_lenth+=self.obs_step
        if self.data_lenth==self.observation_lenth:
            print("原始数据值后三个值为:",self.x[-3:])
        #超出数据长度后附加预测值
        if self.data_lenth<self.observation_lenth:
            self.x.append(action)
            self.predict_list.append(action[0][0])
        #这一步刚好等于数据+预测长度,那么做完了    
        if (self.data_lenth+self.predict_lenth)<=self.observation_lenth: 
            done=True
            print("预测值为:",self.predict_list)
            plt.plot(list(range(len(self.predict_list))),self.predict_list)
            plt.xlabel("day")
            plt.ylabel("price")
            plt.show()
        else:
            done=False
        observation_=self.x[self.observation_lenth-self.length_obs:self.observation_lenth]
        #取action的一个数据与观测到的下一天的数据做差,差越小奖励越大,绝对值倒数的方法表示,10控制奖励过大
        reward=10/(abs(self.x[self.length_obs]-action[0])+1)
        return observation_, reward, done
    def reset(self):
        self.length_obs=100
        self.observation_lenth=100
        #重新读取没预测的数据
        self.x=self.read_data()
        observation=self.x[0:self.observation_lenth]
        return observation

2.主代码部分

import tensorflow as tf
import numpy as np
from date_env import Env
import matplotlib.pyplot as plt
RENDER = False

"""
1.根据price,设置合适的a_mu和a_sigma,因为输出激活函数时tanh只在-1到1之间
2.输出shape只读取了一行csv中的一行数据,如果读取多行需要对环境和神经网络结构做调整
"""
class Sample():
    def __init__(self,env, policy_net):
        self.env = env
        self.policy_net=policy_net
        self.gamma = 0.90
    def sample_step(self,observation):
        obs_next = []
        obs = []
        actions = []
        r = []  
        state = np.reshape(observation, [-1, 100])
        action = self.policy_net.choose_action(state)
        observation_, reward, done = self.env.step(action)
        # 存储当前观测
        obs.append(np.reshape(observation, [-1, 100]))
        # 存储后继观测
        obs_next.append(np.reshape(observation_, [-1, 100]))
        actions.append(action)
        # 存储立即回报
        r.append(reward)
        # reshape 观测和回报
        obs = np.reshape(obs, [len(obs), self.policy_net.n_features])
        obs_next = np.reshape(obs_next, [len(obs_next), self.policy_net.n_features])
        actions = np.reshape(actions, [len(actions),1])
        r = np.reshape(r, [len(r),1])
        """观察区"""
        #print(action[0],observation_[-1])
        return obs, obs_next, actions, r, done,reward
#定义策略网络
class Policy_Net():
    def __init__(self, env, action_bound, lr = 0.0001, model_file=None):
        self.learning_rate = lr
        #输入特征的维数
        self.n_features = env.observation_space_features
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print(self.n_features)
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        print("----------------------------------")
        #输出动作空间的维数
        self.n_actions = 1
        #1.1 输入层
        self.obs = tf.placeholder(tf.float32, shape=[None, self.n_features])
        #1.2.策略网络第一层隐含层
        self.a_f1 = tf.layers.dense(inputs=self.obs, units=2000, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
                             bias_initializer=tf.constant_initializer(0.1))
        #1.3 第二层,均值
        a_mu = tf.layers.dense(inputs=self.a_f1, units=self.n_actions, activation=tf.nn.tanh, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
                                       bias_initializer=tf.constant_initializer(0.1))
        #1.3 第二层,标准差
        a_sigma = tf.layers.dense(inputs=self.a_f1, units=self.n_actions, activation=tf.nn.softplus, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
                                       bias_initializer=tf.constant_initializer(0.1))
        """注意神经网络最后是用的tanh输出的,值大小只在-1到1之间,因此,下面要进行值范围的扩大"""
        self.a_mu = 15*a_mu
        self.a_sigma =a_sigma+0.001
        """定义带参数的正态分布,a_mu和a_sigma必须是float类型"""
        self.normal_dist = tf.contrib.distributions.Normal(self.a_mu, self.a_sigma)
        #根据正态分布采样一个动作
        """在正分布中随机sample一个点,tf.clip_by_value限制大小范围,小于边界就取边界,大于就取边界"""
        self.action = tf.clip_by_value(self.normal_dist.sample(1), action_bound[0],action_bound[1])
        #1.5 当前动作,输入为当前动作,delta,
        self.current_act = tf.placeholder(tf.float32, [None,1])
        self.delta = tf.placeholder(tf.float32, [None,1])
        #2. 构建损失函数
        """normal_dist.log_prob(action)表示action在正态分布中的概率"""
        log_prob = self.normal_dist.log_prob(self.current_act)
        self.a_loss = tf.reduce_mean(log_prob*self.delta+0.01*self.normal_dist.entropy())
        # self.loss += 0.01*self.normal_dist.entropy()
        #3. 定义一个动作优化器
        self.a_train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(-self.a_loss)
        "4.定义critic网络"
        self.c_f1 = tf.layers.dense(inputs=self.obs, units=2000, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
                             bias_initializer=tf.constant_initializer(0.1))
        self.v = tf.layers.dense(inputs=self.c_f1, units=1, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),\
                             bias_initializer=tf.constant_initializer(0.1))
        #定义critic网络的损失函数,输入为td目标
        self.td_target = tf.placeholder(tf.float32, [None,1])
        self.c_loss = tf.square(self.td_target-self.v)
        self.c_train_op = tf.train.AdamOptimizer(0.0002).minimize(self.c_loss)
        #5. tf工程
        self.sess = tf.Session()
        #6. 初始化图中的变量
        self.sess.run(tf.global_variables_initializer())
        #7.定义保存和恢复模型
        self.saver = tf.train.Saver()
        if model_file is not None:
            self.restore_model(model_file)
    #依概率选择动作
    def choose_action(self, state):
        action = self.sess.run(self.action, {self.obs:state})
        return action[0]
    #定义训练
    def train_step(self, state, state_next, label, reward):
        #构建delta数据
        gamma = 0.90
        # print("reward",reward)
        td_target = reward + gamma*self.sess.run(self.v, feed_dict={self.obs:state_next})[0]
        # print("td_target",td_target)
        delta = td_target - self.sess.run(self.v, feed_dict={self.obs:state})
        c_loss, _ = self.sess.run([self.c_loss, self.c_train_op],feed_dict={self.obs: state, self.td_target: td_target})
        a_loss, _ =self.sess.run([self.a_loss, self.a_train_op], feed_dict={self.obs:state, self.current_act:label, self.delta:delta})
        return a_loss, c_loss
    #定义存储模型函数
    def save_model(self, model_path,global_step):
        self.saver.save(self.sess, model_path,global_step=global_step)
    #定义恢复模型函数
    def restore_model(self, model_path):
        self.saver.restore(self.sess, model_path)
def policy_train(env, brain, training_num):
    reward_sum_line = []
    training_time = []
    average_reward = 0
    for i in range(training_num):
        observation = env.reset()
        total_reward = 0
        while True:
            sample = Sample(env,brain)
            #采样数据
            current_state,next_state, current_action, current_r,done,c_r= sample.sample_step(observation)
            # print(current_r)
            total_reward += c_r
            #训练AC网络
            a_loss,c_loss = brain.train_step(current_state,next_state, current_action,current_r)
            if done:
                break
            observation = next_state
        if i == 0:
            average_reward = total_reward
        else:
            average_reward = 0.95*average_reward + 0.05*total_reward
        reward_sum_line.append(average_reward)
        training_time.append(i)
        #print("number of episodes:%d, current average reward is %f"%(i,average_reward))
        
        print("第:%d次 Actor的损失值:%f Critic的损失值:%f"%(i,a_loss,c_loss))
        """======每训练20次进行保存网络,第一次不存储----"""
        if i>0 and i%20==0:
            brain.save_model('net/',i)
        if a_loss<0.001 and c_loss<0.001:
            print("预测值足够小,满足要求")
            break
        
    plt.plot(training_time, reward_sum_line)
    plt.xlabel("training number")
    plt.ylabel("score")
    plt.show()
def policy_test(env, policy,RENDER):
    observation = env.reset()
    reward_sum = 0
    reward_list=[]
    while True:
        # 根据策略网络产生一个动作
        state = np.reshape(observation, [-1, 100])
        action = policy.choose_action(state)
        observation_, reward, done = env.predic_step(action)
        # print(reward)
        reward_list.append(reward[0])
        reward_sum+=reward[0]
        if done:
            break
        observation = observation_
    #print(reward_sum)
    #plt.plot(list(range(len(reward_list))),reward_list)
    #plt.xlabel("training number")
    #plt.ylabel("score")
    #plt.show()

    return reward_list


if __name__=='__main__':
    #创建环境
    
    env = Env()
    action_bound = [env.action_down,env.action_up]
    #实例化策略网络,model_file='D:\\Simulation\\优秀项目\\股票交易\\Stock_Predict\\net\\-99'
    brain = Policy_Net(env,action_bound,model_file='D:\\Simulation\\优秀项目\\Stock\\Stock_Predict - 14.0\\net\\-80')
    #训练时间
    training_num = 1000
    """===策略训练==="""
    #policy_train(env, brain, training_num)
    #测试训练好的策略
    """===预测===注意训练时,brain中加入model_file,指向模型路径"""
    reward_sum = policy_test(env, brain,True)
    
上一篇:关于Eclipse部署openfire3.8.2源码的体会


下一篇:CSS06(字体样式:类型、大小、粗细、颜色)