深度强化学习off-policy类算法多进程加速训练实现(以TD3为例)

思路

与on-policy类算法的多进程加速不同,off-policy算法的子进程虽然也是用来探索环境,但是它收集到的数据可以随时添加进主进程的buffer中。另外,off-policy算法子进程探索环境的策略也不必与主进程等待训练的策略保持完全一致(off-policy名称由来)。听起来off-policy的多进程实现要更容易一些,但是实际做起来却更困难。

实现框架:一个主进程+n个子进程+1个主进程的子线程
功能:

  1. 主进程:训练网络,评价网络;
  2. n个子进程:探索环境,收集数据传回主进程放入主进程的buffer中;
  3. 子线程:并行地在每一个episode去收集子进程中才探索出来的数据(也就是2中数据从3中拿到,子线程在每个episode创建并在episode结束点与主进程汇合(.join()方法));

部分代码(主函数)

完整代码详见:https://github.com/ZYunfeii/DRL_algorithm_library/tree/master/TD3

#!/usr/bin/python
# -*- coding: utf-8 -*-
from td3 import TD3, ReplayBuffer
import gym
import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import Process, Manager, Pipe
from threading import Thread
from copy import deepcopy
import torch
from time import time,sleep
from PPO.draw import Painter

def main():
    env_name = 'Pendulum-v0'
    env = gym.make(env_name)
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]
    td3 = TD3(obs_dim,act_dim)
    """进程初始化"""
    process_num = 4
    pipe_dict = dict((i, (pipe1, pipe2)) for i in range(process_num) for pipe1, pipe2 in (Pipe(),))
    child_process_list = []
    for i in range(process_num):
        pro = Process(target=child_process, args=(pipe_dict[i][1],env_name))
        child_process_list.append(pro)
    child_pi = deepcopy(td3.ac.pi)
    [pipe_dict[i][0].send((child_pi.to("cpu"),)) for i in range(process_num)]

    """进程启动"""
    [p.start() for p in child_process_list]

    MAX_EPISODE = 150
    batch_size = 100

    begin = time()
    rewardList = list()
    timeList = list()
    """预先收集buffer"""
    expected_size = td3.replay_buffer.max_size/1000
    while td3.replay_buffer.size < expected_size:
        for i in range(process_num):
            if pipe_dict[i][0].poll():
                receive = pipe_dict[i][0].recv()
                td3.replay_buffer.store(*receive)
    print('==收集完成!')

    for episode in range(MAX_EPISODE):
        t = Thread(target=child_thread, args=(pipe_dict, process_num, td3)) # 通过线程收集buffer
        t.start()
        child_pi = deepcopy(td3.ac.pi)
        [pipe_dict[i][0].send((child_pi.to("cpu"),)) for i in range(process_num)]
        td3.update(batch_size,100)
        ep_reward = test_pi(deepcopy(td3.ac.pi),env)
        rewardList.append(ep_reward)
        timeList.append(time()-begin)
        print(f"epsiode:{episode} reward:{ep_reward} time:{timeList[-1]}s")
        t.join()

    [p.terminate() for p in child_process_list]

    painter = Painter(load_csv=False)
    painter.addData(rewardList,'MP-TD3',x=timeList)
    painter.drawFigure()

def child_thread(pipe_dict,process_num,td3):
    count = 0
    while True:
        for i in range(process_num):
            if pipe_dict[i][0].poll():
                receice = pipe_dict[i][0].recv()
                td3.replay_buffer.store(*receice)
                count += 1
        if count>200: break         # 每个episode该线程收集样本个数为count



def child_process(pipe,env_name):
    env = gym.make(env_name)
    MAX_STEP = 500
    pi = pipe.recv()[0]
    act_dim = env.action_space.shape[0]
    while True:
        o = env.reset()
        for j in range(MAX_STEP):
            if pipe.poll():
                pi = pipe.recv()[0]
            a = pi(torch.as_tensor(o, dtype=torch.float32, device="cpu")).detach().numpy()
            a += 0.15 * np.random.randn(act_dim)
            a = np.clip(a, -1, 1)*2
            if np.random.rand() < 0.1:
                a = env.action_space.sample()
            o2, r, d, _ = env.step(a)
            pipe.send((o,a,r,o2,d))
            o = o2
            if d: break

def test_pi(pi,env):
    o = env.reset()
    ep_reward = 0
    pi = pi.to("cpu")
    for j in range(500):
        a = pi(torch.as_tensor(o, dtype=torch.float32,device="cpu")).detach().numpy()*2
        o2, r, d, _ = env.step(a)
        o = o2
        ep_reward += r
        if d: break
    return ep_reward

if __name__ == "__main__":
    main()

结果

采用4进程方案:
深度强化学习off-policy类算法多进程加速训练实现(以TD3为例)
横坐标是秒,纵坐标是reward,环境是gym的'Pendulum-v0'

可以看到不到半分钟就训练好了。

上一篇:策略工具


下一篇:Reinforcement Learning as One Big Sequence Modeling Problem