DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

DQN(Deep Q-learning)入门教程(四)之Q-learning Play Flappy Bird中,我们使用q-learning算法去对Flappy Bird进行强化学习,而在这篇博客中我们将使用神经网络模型来代替Q-table,关于DQN的介绍,可以参考我前一篇博客:DQN(Deep Q-learning)入门教程(五)之DQN介绍

在这篇博客中将使用DQN做如下操作:

  • Flappy Bird
  • MountainCar-v0

再回顾一下DQN的算法流程:

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

项目地址:Github

MountainCar-v0

MountainCar的训练好的Gif示意图如下所示,汽车起始位置位于山的底部,最终目标是驶向右边山的插旗的地方,其中,汽车的引擎不能够直接驶向终点,必须借助左边的山体的重力加速度才能够驶向终点。

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

MountainCar-v0由OpenAI提供,python包为gym,官网网站为https://gym.openai.com/envs/MountainCar-v0/。在Gym包中,提供了很多可以用于强化学习的环境(env):

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

在MountainCar-v0中,状态有2个变量,car position(汽车的位置),car vel(汽车的速度),action一共有3种 Accelerate to the Left Don‘t accelerateAccelerate to the Right,然后当车达到旗帜的地方(position = 0.5)会得到\(reward = 1\)的奖励,如果没有达到则为\(-1\)。但是如果当你运行步骤超过200次的时候,游戏就会结束。详情可以参考源代码(ps:官方文档中没有这些说明)。

下面介绍一下gym中几个常用的函数:

  • env = gym.make("MountainCar-v0")
    

    这个就是创建一个MountainCar-v0的游戏环境。

  • state = env.reset()
    

    重置环境,返回重置后的state

  • env.render()
    

    将运行画面展示在屏幕上面,当我们在训练的时候可以不使用这个来提升速度。

  • next_state, reward, done, _ = env.step(action)
    

    执行action动作,返回下一个状态,奖励,是否完成,info。

初始化Agent

初始化Agent直接使用代码说明吧,这个还是比较简单的:

import keras
import random
from collections import deque

import gym
import numpy as np
from keras.layers import Dense
from keras.models import Sequential


class Agent():
    def __init__(self, action_set, observation_space):
        """
        初始化
        :param action_set: 动作集合
        :param observation_space: 环境属性,我们需要使用它得到state的shape
        """
        # 奖励衰减
        self.gamma = 1.0
        # 从经验池中取出数据的数量
        self.batch_size = 50
        # 经验池
        self.memory = deque(maxlen=2000000)
        # 探索率
        self.greedy = 1.0
        # 动作集合
        self.action_set = action_set
        # 环境的属性
        self.observation_space = observation_space
        # 神经网路模型
        self.model = self.init_netWork()

    def init_netWork(self):
        """
        构建模型
        :return: 模型
        """
        model = Sequential()
        # self.observation_space.shape[0],state的变量的数量
        model.add(Dense(64 * 4, activation="tanh", input_dim=self.observation_space.shape[0]))
        model.add(Dense(64 * 4, activation="tanh"))
        # self.action_set.n 动作的数量
        model.add(Dense(self.action_set.n, activation="linear"))
        model.compile(loss=keras.losses.mean_squared_error,
                      optimizer=keras.optimizers.RMSprop(lr=0.001))
        return model

我们使用队列来保存经验,这样的话新的数据就会覆盖远古的数据。此时我们定义一个函数,专门用来将数据保存到经验池中,然后定义一个函数用来更新\(\epsilon\)探索率。

def add_memory(self, sample):
    self.memory.append(sample)

def update_greedy(self):
    # 小于最小探索率的时候就不进行更新了。
    if self.greedy > 0.01:
        self.greedy *= 0.995

训练模型

首先先看代码:

def train_model(self):
    # 从经验池中随机选择部分数据
    train_sample = random.sample(self.memory, k=self.batch_size)

    train_states = []
    next_states = []
    for sample in train_sample:
        cur_state, action, r, next_state, done = sample
        next_states.append(next_state)
        train_states.append(cur_state)

    # 转成np数组
    next_states = np.array(next_states)
    train_states = np.array(train_states)
    # 得到next_state的q值
    next_states_q = self.model.predict(next_states)

    # 得到state的预测值
    state_q = self.model.predict_on_batch(train_states)

    # 计算Q现实
    for index, sample in enumerate(train_sample):
        cur_state, action, r, next_state, done = sample
        if not done:
            state_q[index][action] = r + self.gamma * np.max(next_states_q[index])
        else:
            state_q[index][action] = r
    
    self.model.train_on_batch(train_states, state_q)

大家肯定从上面的代码发现一些问题,使用了两个for循环,why?首先先说一下两个for循环分别的作用:

  • 第一个for循环:得到train_statesnext_states,其中next_states是为了计算Q现实。
  • 第二个for循环:计算Q现实

可能有人会有一个疑问,为什么我不写成一个for循环呢?实际上写成一个for循环是完全没有问题的,很??,但是写成一个for循环意味着我们要多次调用model.predict_on_batch,这样会耗费一定的时间(亲身试验过,这样会比较慢),因此,我们写成了两个for循环,然后只需要调用一次predict

执行动作与选择最佳动作

执行动作的代码如下所示:

def act(self, env, action):
    """
    执行动作
    :param env: 执行环境
    :param action: 执行的动作
    :return: ext_state, reward, done
    """
    next_state, reward, done, _ = env.step(action)

    if done:
        if reward < 0:
            reward = -100
        else:
            reward = 10
    else:
        if next_state[0] >= 0.4:
            reward += 1

    return next_state, reward, done

其中,我们可以修改奖励以加快网络收敛。

选择最好的动作的动作如下所示,会以一定的探索率随机选择动作。

def get_best_action(self, state):
    if random.random() < self.greedy:
        return self.action_set.sample()
    else:
        return np.argmax(self.model.predict(state.reshape(-1, 2)))

开始训练

关于具体的解释,在注释中已经详细的说明了:

if __name__ == "__main__":
    # 训练次数
    episodes = 10000
    # 实例化游戏环境
    env = gym.make("MountainCar-v0")
    # 实例化Agent
    agent = Agent(env.action_space, env.observation_space)
    # 游戏中动作执行的次数(最大为200)
    counts = deque(maxlen=10)

    for episode in range(episodes):
        count = 0
        # 重置游戏
        state = env.reset()

        # 刚开始不立即更新探索率
        if episode >= 5:
            agent.update_greedy()

        while True:
            count += 1
            # 获得最佳动作
            action = agent.get_best_action(state)
            next_state, reward, done = agent.act(env, action)
            agent.add_memory((state, action, reward, next_state, done))
            # 刚开始不立即训练模型,先填充经验池
            if episode >= 5:
                agent.train_model()
            state = next_state
            if done:
                # 将执行的次数添加到counts中
                counts.append(count)
                print("在{}轮中,agent执行了{}次".format(episode + 1, count))

                # 如果近10次,动作执行的平均次数少于160,则保存模型并退出
                if len(counts) == 10 and np.mean(counts) < 160:
                    agent.model.save("car_model.h5")
                    exit(0)
                break

训练一定的次数后,我们就可以得到模型了。然后进行测试。

模型测试

测试的代码没什么好说的,如下所示:

import gym
from keras.models import load_model
import numpy as np
model = load_model("car_model.h5")

env = gym.make("MountainCar-v0")

for i in range(100):
    state = env.reset()
    count = 0
    while True:
        env.render()
        count += 1
        action = np.argmax(model.predict(state.reshape(-1, 2)))
        next_state, reward, done, _ = env.step(action)
        state = next_state
        if done:
            print("游戏的次数:", count)
            break

部分的结果如下:

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

Flappy Bird

FlappyBird的代码我就不过多赘述了,里面的一些函数介绍可以参照这个来看:DQN(Deep Q-learning)入门教程(四)之Q-learning Play Flappy Bird,代码思想与训练Mountain-Car基本是一致的。

import random
from collections import deque

import keras
import numpy as np
from keras.layers import Dense
from keras.models import Sequential
from ple import PLE
from ple.games import FlappyBird


class Agent():
    def __init__(self, action_set):
        self.gamma = 1
        self.model = self.init_netWork()
        self.batch_size = 128
        self.memory = deque(maxlen=2000000)
        self.greedy = 1
        self.action_set = action_set

    def get_state(self, state):
        """
        提取游戏state中我们需要的数据
        :param state: 游戏state
        :return: 返回提取好的数据
        """
        return_state = np.zeros((3,))
        dist_to_pipe_horz = state["next_pipe_dist_to_player"]
        dist_to_pipe_bottom = state["player_y"] - state["next_pipe_top_y"]
        velocity = state[‘player_vel‘]
        return_state[0] = dist_to_pipe_horz
        return_state[1] = dist_to_pipe_bottom
        return_state[2] = velocity
        return return_state

    def init_netWork(self):
        """
        构建模型
        :return:
        """
        model = Sequential()
        model.add(Dense(64 * 4, activation="tanh", input_shape=(3,)))
        model.add(Dense(64 * 4, activation="tanh"))
        model.add(Dense(2, activation="linear"))

        model.compile(loss=keras.losses.mean_squared_error,
                      optimizer=keras.optimizers.RMSprop(lr=0.001))
        return model

    def train_model(self):
        if len(self.memory) < 2500:
            return

        train_sample = random.sample(self.memory, k=self.batch_size)
        train_states = []
        next_states = []

        for sample in train_sample:
            cur_state, action, r, next_state, done = sample
            next_states.append(next_state)
            train_states.append(cur_state)
        # 转成np数组
        next_states = np.array(next_states)
        train_states = np.array(train_states)

        # 得到下一个state的q值
        next_states_q = self.model.predict(next_states)
        # 得到预测值
        state_q = self.model.predict_on_batch(train_states)

        for index, sample in enumerate(train_sample):
            cur_state, action, r, next_state, done = sample
            # 计算Q现实
            if not done:
                state_q[index][action] = r + self.gamma * np.max(next_states_q[index])
            else:
                state_q[index][action] = r
        self.model.train_on_batch(train_states, state_q)

    def add_memory(self, sample):
        self.memory.append(sample)

    def update_greedy(self):
        if self.greedy > 0.01:
            self.greedy *= 0.995

    def get_best_action(self, state):
        if random.random() < self.greedy:
            return random.randint(0, 1)
        else:
            return np.argmax(self.model.predict(state.reshape(-1, 3)))

    def act(self, p, action):
        """
        执行动作
        :param p: 通过p来向游戏发出动作命令
        :param action: 动作
        :return: 奖励
        """
        r = p.act(self.action_set[action])
        if r == 0:
            r = 1
        if r == 1:
            r = 100
        else:
            r = -1000
        return r


if __name__ == "__main__":
    # 训练次数
    episodes = 20000
    # 实例化游戏对象
    game = FlappyBird()
    # 类似游戏的一个接口,可以为我们提供一些功能
    p = PLE(game, fps=30, display_screen=False)
    # 初始化
    p.init()
    # 实例化Agent,将动作集传进去
    agent = Agent(p.getActionSet())
    max_score = 0
    scores = deque(maxlen=10)

    for episode in range(episodes):
        # 重置游戏
        p.reset_game()
        # 获得状态
        state = agent.get_state(game.getGameState())
        if episode > 150:
            agent.update_greedy()
        while True:
            # 获得最佳动作
            action = agent.get_best_action(state)
            # 然后执行动作获得奖励
            reward = agent.act(p, action)
            # 获得执行动作之后的状态
            next_state = agent.get_state(game.getGameState())
            agent.add_memory((state, action, reward, next_state, p.game_over()))
            agent.train_model()
            state = next_state
            if p.game_over():
                # 获得当前分数
                current_score = p.score()
                max_score = max(max_score, current_score)
                scores.append(current_score)
                print(‘第%s次游戏,得分为: %s,最大得分为: %s‘ % (episode, current_score, max_score))
                if len(scores) == 10 and np.mean(scores) > 150:
                    agent.model.save("bird_model.h5")
                    exit(0)
                break

该部分相比较于Mountain-Car需要更长的时间,目前的我还没有训练出比较好的效果,截至写完这篇博客,最新的数据如下所示:

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

emm,我又不想让我的电脑一直开着,??。

总结

上面的两个例子便是DQN最基本最基本的使用,我们还可以将上面的FlappyBird的问题稍微复杂化一点,比如说我们无法直接的知道环境的状态,我们则可以使用CNN网络去从游戏图片入手(关于这种做法,网络上有很多人写了相对应的博客)。

项目地址:Github

参考

DQN(Deep Q-learning)入门教程(六)之DQN Play Flappy-bird ,MountainCar

上一篇:记一次App中多进程初始化导致百度定位失效问题


下一篇:Appium自动化测试之IOS模拟器与真机