基于强化学习动态避障的Python实现(绝赞摸鱼版)

基于强化学习动态避障的Python实现

吐槽在前

这是我的研究生小课题,可是老师从头到尾没有理过我,只给了我一个题目,连稍微具体一点的要求都没提。那我就摸鱼摸爆

于是我进行了许多的简化,到最后做出了一个网格世界(GridWorld)的环境模型,在5*5的网格世界中,用一个格子表示我们的Agent,四个格子表示障碍(两个动态障碍两个静态障碍),一个格子表示目的地。(想法来自于Matlab的强化学习工具箱

参考文章

主要参考这位大佬。
TA整合了另外两位大佬的文章:
https://blog.csdn.net/extremebingo/article/details/80867486
https://blog.csdn.net/gg_18826075157/article/details/78163386

环境

VS Code下:
Python 3.8
gym 忘记是多少,但是无所谓啦只要不是远古版本肯定没问题

训练环境设置

大佬的基础上,将世界大小扩充至5*5,并加入两个会来回移动的障碍物,大致如图:
基于强化学习动态避障的Python实现(绝赞摸鱼版)
上图是初始状态下的环境,我们的agent在左上角出发,目的地设置为右下角,黑色的是静态障碍,它们不会随着时间移动,红色的是动态障碍,它们的移动速度和agent一样。我们把左边的动态障碍称为动态障碍1,右边的动态障碍称为动态障碍2。动态障碍1会沿着当前列上下来回移动,动态障碍2则只在当前位置到左边两格的范围来回移动,如图:
基于强化学习动态避障的Python实现(绝赞摸鱼版)
每个训练episode中,agent在每一个step都可以选择四个方向行进,撞墙了或者走的步数超过了20步则终止该次训练。

环境代码__init__(self):

用一个1×25的list来表示5×5的矩阵,从左上到右下分别表示为0~24(states),如此一来,我们的agent从0出发,目的地是24,静态障碍的位置分别为10和19,动态障碍1起始位置为21,动态障碍2起始位置为14。在该环境下,上走为-5,下为+5,左为-1,右为+1(actions)。在环境的class的__init__(self):下则有:

#------------------Initial states and directions--------------#
self.Terminal = 24      # 5*5 space represented by 1*25 matrix: 0 ~ 24
self.state = 0
self.StaticObs1 = 10
self.StaticObs2 = 19
self.DynamicObs1 = 21
self.DynamicObs2 = 14
self.actions = [0, 1, 2, 3]     # Direction: Up_0, Down_1, Left_2, Right_3
self.Obs1Dir = 0
self.Obs2Dir = 2
self.gamma = 0.8
self.viewer = None
#-------------------------------------------------------------#

#------------------Transformation Dictionary------------------#
self.size = 5
self.T = dict()
for i in range(self.size, self.size * self.size):
	self.T[str(i) + '_0'] = i - 5   # States that can go up

for i in range(self.size * (self.size - 1)):
    self.T[str(i) + '_1'] = i + 5   # States that can go down

for i in range(1, self.size * self.size):
    if i % self.size == 0:
        continue
    self.T[str(i) + '_2'] = i - 1   # States that can go left  
        
for i in range(self.size * self.size):
    if (i + 1) % self.size == 0:
        continue
    self.T[str(i) + '_3'] = i + 1   # States that can go right
#-------------------------------------------------------------#

在这里,我觉得有一个很精妙的地方,大佬们把状态states和行为actions用拼接的方法拼到了一起,组合成了state_action这样的key,以调用行动结果以及查找该state下做这个action的奖励值(reward):

#--------------------------Rewards----------------------------#
self.Rewards = dict()   # Keys are combined with 'states' and '_actions'
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0
#-------------------------------------------------------------#

撞到障碍物或目的地则该次训练结束:

#-----------------Episodes stopping criterion-----------------#
self.TerminateStates = dict()      # When crash into obstacles or arrive terminal
self.TerminateStates[self.StaticObs1] = 1
self.TerminateStates[self.StaticObs2] = 1
self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1
self.TerminateStates[self.Terminal] = 1
#-------------------------------------------------------------#

为了让结果可视化,我们需要自己渲染结果,比如我打算设置一个700×700的窗口,那么,每一格的中心的横坐标为[150, 250, 350, 450, 550]重复5次(因为是一个1×25的list,每5个为环境的一行),相应地,纵坐标为150,250,350,450,550分别重复5次。

#------------------Coordinates for ploting--------------------#
self.x = [150,250,350,450,550] * 5
self.y = [550] * 5 + [450] * 5 + [350] * 5 + [250] * 5 + [150] * 5
#-------------------------------------------------------------#

以上,是环境初始化的__init__(self)函数的主要内容
接下来,是强化学习的关键,step函数和reset函数。

环境变化设置step(self, action):

step函数是会在主函数里疯狂调用的函数,它揭示了该环境下的一切变化。即在一个训练episode里,每进行一步,agent采取行动导致的变化,障碍如何移动以及障碍移动导致reward字典的变化都应该包含在step函数里。具体的思路是:在主函数里决定采取的action,然后传入step,在step里进行了障碍的移动和reward的更新后,判断该次行动带来的reward,以及此次训练是否需要结束等。故此函数需要返回的东西有:agent采取行动后的状态,奖励值,是否结束此次训练的flag。

首先,我们要判断agent是否碰上了障碍,我能想到的有两种情况:

  1. 在上一步中,agent和障碍已经靠在一起,而在采取行动后,障碍和agent的位置互换了。这种情况就是发生了正碰,迎头装上。
  2. 在上一步中,agent和障碍没有靠一起,但是它们在这一步过后,位置重叠了。这种情况就是发生了90°的碰撞,比如一个从下面来,一个从左边来,跑到了同一个地方。

所以我们需要记录障碍在行动前和行动后的两个状态,在环境的step(self,action):中则有:

self.temp = dict()
self.temp[self.DynamicObs1] = 1
self.temp[self.DynamicObs2] = 1

# Update terminate states
self.TerminateStates.pop(self.DynamicObs1)
self.TerminateStates.pop(self.DynamicObs2)

if self.Obs1Dir == 0:
    if self.DynamicObs1 == 1:
        self.Obs1Dir = 1
        self.DynamicObs1 += 5
    else:
        self.DynamicObs1 -= 5
else:
    if self.DynamicObs1 == 21:
        self.DynamicObs1 -= 5
        self.Obs1Dir = 0
    else:
        self.DynamicObs1 += 5

if self.Obs2Dir == 2:
    if self.DynamicObs2 == 12:
        self.DynamicObs2 += 1
        self.Obs2Dir = 3
    else:
        self.DynamicObs2 -= 1
else:
    if self.DynamicObs2 == 14:
        self.Obs2Dir = 2
        self.DynamicObs2 -= 1
    else:
        self.DynamicObs2 += 1

self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1

# Update rewards dictionary
self.Rewards = dict()
if self.DynamicObs1 == 21:
    self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
    self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
    self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
elif self.DynamicObs1 == 1:
    self.Rewards[str(self.DynamicObs1 + 5) + '_0'] = -200.0
    self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
    self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
else:
    self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
    self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
    self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
    self.Rewards[str(self.DynamicObs1 + 5) + '_0'] = -200.0

if self.DynamicObs2 == 14:
    self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
    self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
    self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
else:
    self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
    self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
    self.Rewards[str(self.DynamicObs2 + 1) + '_2'] = -200.0
    self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0

self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0

至此,我们做好了一步之下,障碍的运动及记录和reward的更新。

接下来,我们要根据传入的action,来判断agent采取该action所得的reward:判断是否撞墙,判断是否撞障碍,判断是否接近了目标等。

state = self.state
key = "%d_%d"%(state,action)

# Dectect whether this action will lead to crashing into the wall
if key in self.T:
    next_state = self.T[key]
else:
    next_state = state
    r = -200.0
    is_Done = True
    return next_state, r, is_Done, {}

# Dectect whether this action will lead to crashing into the obstacles
self.state = next_state     # Update state
is_Done = False
if next_state in self.TerminateStates or (next_state in self.temp and state in self.TerminateStates):
    is_Done = True

if key not in self.Rewards:
    if (self.Terminal - next_state) < (self.Terminal - state):
        r = 20.0
    else:
        r = -50.0
else:
    r = self.Rewards[key]

return next_state, r, is_Done, {}

至此,step函数完成。

在这里我想多说一个我这样的小白容易犯的错误——一开始的时候,我几乎只给出惩罚reward没有给出奖励reward,这导致agent在尝试不多次都看不见终点的奖励以后,直接走进自闭,宁愿开局撞墙,拿个-200分,也不愿意再去尝试别的路子,因为我给它设定的世界里全是惩罚,只有看不见的终点有奖励。agent: 捏马,机生无望啊555 agent拿最高分的办法就是开局自鲨

环境变化设置reset(self):

reset函数也很重要,但是没啥好说的,就是充值一下各单位位置以及reward字典,直接在__init__(self):里复制粘贴就好

# Reset states and directions
self.Terminal = 24
self.state = 0
self.StaticObs1 = 10
self.StaticObs2 = 19
self.DynamicObs1 = 21
self.DynamicObs2 = 14
self.actions = [0, 1, 2, 3]
self.Obs1Dir = 0
self.Obs2Dir = 2
self.gamma = 0.8
# self.viewer = None

# Reset episodes stopping criterion
self.TerminateStates = dict()
self.TerminateStates[self.StaticObs1] = 1
self.TerminateStates[self.StaticObs2] = 1
self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1
self.TerminateStates[self.Terminal] = 1

# Reset rewards dictionary
self.Rewards = dict()
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0

return self

环境渲染设置render(self, mode = ‘human’):

这里应该没啥好说的,就是单纯的画图,认真看一下都能明白咋整的

from gym.envs.classic_control import rendering
screen_width = 700
screen_height = 700

if self.viewer is None:
    self.viewer = rendering.Viewer(screen_width,screen_height)

    # Plot the GridWorld
    self.line1 = rendering.Line((100,100),(600,100))
    self.line2 = rendering.Line((100, 200), (600, 200))
    self.line3 = rendering.Line((100, 300), (600, 300))
    self.line4 = rendering.Line((100, 400), (600, 400))
    self.line5 = rendering.Line((100, 500), (600, 500))
    self.line6 = rendering.Line((100, 600), (600, 600))

    self.line7 = rendering.Line((100, 100), (100, 600))
    self.line8 = rendering.Line((200, 100), (200, 600))
    self.line9 = rendering.Line((300, 100), (300, 600))
    self.line10 = rendering.Line((400, 100), (400, 600))
    self.line11 = rendering.Line((500, 100), (500, 600))
    self.line12 = rendering.Line((600, 100), (600, 600))


    # Plot dynamic obstacle_1
    self.obs1 = rendering.make_circle(40)
    self.obs1trans = rendering.Transform()    # translation=(250, 150)
    self.obs1.add_attr(self.obs1trans)
    self.obs1.set_color(1, 0, 0)

    # Plot dynamic obstacle_2
    self.obs2 = rendering.make_circle(40)
    self.obs2trans = rendering.Transform()
    self.obs2.add_attr(self.obs2trans)
    self.obs2.set_color(1, 0, 0)

    # Plot static obstacle_1
    self.obstacle_1 = rendering.make_circle(40)
    self.obstacle1trans = rendering.Transform()
    self.obstacle_1.add_attr(self.obstacle1trans)
    self.obstacle_1.set_color(0, 0, 0)

    # Plot static obstacle_2
    self.obstacle_2 = rendering.make_circle(40)
    self.obstacle2trans = rendering.Transform()
    self.obstacle_2.add_attr(self.obstacle2trans)
    self.obstacle_2.set_color(0, 0, 0)

    # Plot Terminal
    self.terminal = rendering.make_circle(40)
    self.circletrans = rendering.Transform(translation=(550, 150))
    self.terminal.add_attr(self.circletrans)
    self.terminal.set_color(0, 0, 1)

    # Plot robot
    self.robot= rendering.make_circle(30)
    self.robotrans = rendering.Transform()
    self.robot.add_attr(self.robotrans)
    self.robot.set_color(0, 1, 0)

    self.line1.set_color(0, 0, 0)
    self.line2.set_color(0, 0, 0)
    self.line3.set_color(0, 0, 0)
    self.line4.set_color(0, 0, 0)
    self.line5.set_color(0, 0, 0)
    self.line6.set_color(0, 0, 0)
    self.line7.set_color(0, 0, 0)
    self.line8.set_color(0, 0, 0)
    self.line9.set_color(0, 0, 0)
    self.line10.set_color(0, 0, 0)
    self.line11.set_color(0, 0, 0)
    self.line12.set_color(0, 0, 0)

    self.viewer.add_geom(self.line1)
    self.viewer.add_geom(self.line2)
    self.viewer.add_geom(self.line3)
    self.viewer.add_geom(self.line4)
    self.viewer.add_geom(self.line5)
    self.viewer.add_geom(self.line6)
    self.viewer.add_geom(self.line7)
    self.viewer.add_geom(self.line8)
    self.viewer.add_geom(self.line9)
    self.viewer.add_geom(self.line10)
    self.viewer.add_geom(self.line11)
    self.viewer.add_geom(self.line12)
    self.viewer.add_geom(self.obs1)
    self.viewer.add_geom(self.obs2)
    self.viewer.add_geom(self.obstacle_1)
    self.viewer.add_geom(self.obstacle_2)
    self.viewer.add_geom(self.terminal)
    self.viewer.add_geom(self.robot)

if self.state is None:
    return None

self.robotrans.set_translation(self.x[self.state], self.y[self.state])
self.obs1trans.set_translation(self.x[self.DynamicObs1], self.y[self.DynamicObs1])
self.obs2trans.set_translation(self.x[self.DynamicObs2], self.y[self.DynamicObs2])
self.obstacle1trans.set_translation(self.x[self.StaticObs1], self.y[self.StaticObs1])
self.obstacle2trans.set_translation(self.x[self.StaticObs2], self.y[self.StaticObs2])
return self.viewer.render(return_rgb_array=mode == 'rgb_array')

环境关闭close(self):

if self.viewer:
    self.viewer.close()
    self.viewer = None

至此,训练环境相关的设置已经彻底完成!

接下来,训练过程的代码和大佬们的大同小异。

基于Q-learning和Epsilon-greedy训练

值得一提的地方是:在训练过程中,判断本次训练结束后,我们需要在大佬的代码里再添加一个判断:如果不是因为到达终点而终止的训练的话,直接给一个-200.0的reward并更新q_table防止卡在一种死法上 。而更新q_table的步骤在action决策函数里面,故在if Done:下添加:

if observation != 24:
    action, state = get_action(state, action, observation, -200.0, episode, 0.5)

以及删除下面一句我也没看懂的episode_reward = -100
其它的和大佬的代码基本一样啦!

上一篇:45 The Effect of External Rewards on Behavior 外界奖励对行为的影响


下一篇:第二篇 Flask基础篇之(闪现,蓝图,请求扩展,中间件)