基于强化学习动态避障的Python实现
吐槽在前
这是我的研究生小课题,可是老师从头到尾没有理过我,只给了我一个题目,连稍微具体一点的要求都没提。那我就摸鱼摸爆
于是我进行了许多的简化,到最后做出了一个网格世界(GridWorld)的环境模型,在5*5的网格世界中,用一个格子表示我们的Agent,四个格子表示障碍(两个动态障碍两个静态障碍),一个格子表示目的地。(想法来自于Matlab的强化学习工具箱)
参考文章
主要参考这位大佬。
TA整合了另外两位大佬的文章:
https://blog.csdn.net/extremebingo/article/details/80867486
https://blog.csdn.net/gg_18826075157/article/details/78163386
环境
VS Code下:
Python 3.8
gym 忘记是多少,但是无所谓啦只要不是远古版本肯定没问题
训练环境设置
在大佬的基础上,将世界大小扩充至5*5,并加入两个会来回移动的障碍物,大致如图:
上图是初始状态下的环境,我们的agent在左上角出发,目的地设置为右下角,黑色的是静态障碍,它们不会随着时间移动,红色的是动态障碍,它们的移动速度和agent一样。我们把左边的动态障碍称为动态障碍1,右边的动态障碍称为动态障碍2。动态障碍1会沿着当前列上下来回移动,动态障碍2则只在当前位置到左边两格的范围来回移动,如图:
每个训练episode中,agent在每一个step都可以选择四个方向行进,撞墙了或者走的步数超过了20步则终止该次训练。
环境代码__init__(self):
用一个1×25的list来表示5×5的矩阵,从左上到右下分别表示为0~24(states),如此一来,我们的agent从0出发,目的地是24,静态障碍的位置分别为10和19,动态障碍1起始位置为21,动态障碍2起始位置为14。在该环境下,上走为-5,下为+5,左为-1,右为+1(actions)。在环境的class的__init__(self):下则有:
#------------------Initial states and directions--------------#
self.Terminal = 24 # 5*5 space represented by 1*25 matrix: 0 ~ 24
self.state = 0
self.StaticObs1 = 10
self.StaticObs2 = 19
self.DynamicObs1 = 21
self.DynamicObs2 = 14
self.actions = [0, 1, 2, 3] # Direction: Up_0, Down_1, Left_2, Right_3
self.Obs1Dir = 0
self.Obs2Dir = 2
self.gamma = 0.8
self.viewer = None
#-------------------------------------------------------------#
#------------------Transformation Dictionary------------------#
self.size = 5
self.T = dict()
for i in range(self.size, self.size * self.size):
self.T[str(i) + '_0'] = i - 5 # States that can go up
for i in range(self.size * (self.size - 1)):
self.T[str(i) + '_1'] = i + 5 # States that can go down
for i in range(1, self.size * self.size):
if i % self.size == 0:
continue
self.T[str(i) + '_2'] = i - 1 # States that can go left
for i in range(self.size * self.size):
if (i + 1) % self.size == 0:
continue
self.T[str(i) + '_3'] = i + 1 # States that can go right
#-------------------------------------------------------------#
在这里,我觉得有一个很精妙的地方,大佬们把状态states和行为actions用拼接的方法拼到了一起,组合成了state_action这样的key,以调用行动结果以及查找该state下做这个action的奖励值(reward):
#--------------------------Rewards----------------------------#
self.Rewards = dict() # Keys are combined with 'states' and '_actions'
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0
#-------------------------------------------------------------#
撞到障碍物或目的地则该次训练结束:
#-----------------Episodes stopping criterion-----------------#
self.TerminateStates = dict() # When crash into obstacles or arrive terminal
self.TerminateStates[self.StaticObs1] = 1
self.TerminateStates[self.StaticObs2] = 1
self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1
self.TerminateStates[self.Terminal] = 1
#-------------------------------------------------------------#
为了让结果可视化,我们需要自己渲染结果,比如我打算设置一个700×700的窗口,那么,每一格的中心的横坐标为[150, 250, 350, 450, 550]重复5次(因为是一个1×25的list,每5个为环境的一行),相应地,纵坐标为150,250,350,450,550分别重复5次。
#------------------Coordinates for ploting--------------------#
self.x = [150,250,350,450,550] * 5
self.y = [550] * 5 + [450] * 5 + [350] * 5 + [250] * 5 + [150] * 5
#-------------------------------------------------------------#
以上,是环境初始化的__init__(self)函数的主要内容
接下来,是强化学习的关键,step函数和reset函数。
环境变化设置step(self, action):
step函数是会在主函数里疯狂调用的函数,它揭示了该环境下的一切变化。即在一个训练episode里,每进行一步,agent采取行动导致的变化,障碍如何移动以及障碍移动导致reward字典的变化都应该包含在step函数里。具体的思路是:在主函数里决定采取的action,然后传入step,在step里进行了障碍的移动和reward的更新后,判断该次行动带来的reward,以及此次训练是否需要结束等。故此函数需要返回的东西有:agent采取行动后的状态,奖励值,是否结束此次训练的flag。
首先,我们要判断agent是否碰上了障碍,我能想到的有两种情况:
- 在上一步中,agent和障碍已经靠在一起,而在采取行动后,障碍和agent的位置互换了。这种情况就是发生了正碰,迎头装上。
- 在上一步中,agent和障碍没有靠一起,但是它们在这一步过后,位置重叠了。这种情况就是发生了90°的碰撞,比如一个从下面来,一个从左边来,跑到了同一个地方。
所以我们需要记录障碍在行动前和行动后的两个状态,在环境的step(self,action):中则有:
self.temp = dict()
self.temp[self.DynamicObs1] = 1
self.temp[self.DynamicObs2] = 1
# Update terminate states
self.TerminateStates.pop(self.DynamicObs1)
self.TerminateStates.pop(self.DynamicObs2)
if self.Obs1Dir == 0:
if self.DynamicObs1 == 1:
self.Obs1Dir = 1
self.DynamicObs1 += 5
else:
self.DynamicObs1 -= 5
else:
if self.DynamicObs1 == 21:
self.DynamicObs1 -= 5
self.Obs1Dir = 0
else:
self.DynamicObs1 += 5
if self.Obs2Dir == 2:
if self.DynamicObs2 == 12:
self.DynamicObs2 += 1
self.Obs2Dir = 3
else:
self.DynamicObs2 -= 1
else:
if self.DynamicObs2 == 14:
self.Obs2Dir = 2
self.DynamicObs2 -= 1
else:
self.DynamicObs2 += 1
self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1
# Update rewards dictionary
self.Rewards = dict()
if self.DynamicObs1 == 21:
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
elif self.DynamicObs1 == 1:
self.Rewards[str(self.DynamicObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
else:
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs1 + 5) + '_0'] = -200.0
if self.DynamicObs2 == 14:
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
else:
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0
至此,我们做好了一步之下,障碍的运动及记录和reward的更新。
接下来,我们要根据传入的action,来判断agent采取该action所得的reward:判断是否撞墙,判断是否撞障碍,判断是否接近了目标等。
state = self.state
key = "%d_%d"%(state,action)
# Dectect whether this action will lead to crashing into the wall
if key in self.T:
next_state = self.T[key]
else:
next_state = state
r = -200.0
is_Done = True
return next_state, r, is_Done, {}
# Dectect whether this action will lead to crashing into the obstacles
self.state = next_state # Update state
is_Done = False
if next_state in self.TerminateStates or (next_state in self.temp and state in self.TerminateStates):
is_Done = True
if key not in self.Rewards:
if (self.Terminal - next_state) < (self.Terminal - state):
r = 20.0
else:
r = -50.0
else:
r = self.Rewards[key]
return next_state, r, is_Done, {}
至此,step函数完成。
在这里我想多说一个我这样的小白容易犯的错误——一开始的时候,我几乎只给出惩罚reward没有给出奖励reward,这导致agent在尝试不多次都看不见终点的奖励以后,直接走进自闭,宁愿开局撞墙,拿个-200分,也不愿意再去尝试别的路子,因为我给它设定的世界里全是惩罚,只有看不见的终点有奖励。agent: 捏马,机生无望啊555 agent拿最高分的办法就是开局自鲨
环境变化设置reset(self):
reset函数也很重要,但是没啥好说的,就是充值一下各单位位置以及reward字典,直接在__init__(self):里复制粘贴就好
# Reset states and directions
self.Terminal = 24
self.state = 0
self.StaticObs1 = 10
self.StaticObs2 = 19
self.DynamicObs1 = 21
self.DynamicObs2 = 14
self.actions = [0, 1, 2, 3]
self.Obs1Dir = 0
self.Obs2Dir = 2
self.gamma = 0.8
# self.viewer = None
# Reset episodes stopping criterion
self.TerminateStates = dict()
self.TerminateStates[self.StaticObs1] = 1
self.TerminateStates[self.StaticObs2] = 1
self.TerminateStates[self.DynamicObs1] = 1
self.TerminateStates[self.DynamicObs2] = 1
self.TerminateStates[self.Terminal] = 1
# Reset rewards dictionary
self.Rewards = dict()
self.Rewards[str(self.DynamicObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs1 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.DynamicObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.DynamicObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.DynamicObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs1 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs1 + 1) + '_2'] = -200.0
self.Rewards[str(self.StaticObs1 + 5) + '_0'] = -200.0
self.Rewards[str(self.StaticObs2 - 5) + '_1'] = -200.0
self.Rewards[str(self.StaticObs2 - 1) + '_3'] = -200.0
self.Rewards[str(self.StaticObs2 + 5) + '_0'] = -200.0
self.Rewards[str(self.Terminal - 5) + '_1'] = 100.0
self.Rewards[str(self.Terminal - 1) + '_3'] = 100.0
return self
环境渲染设置render(self, mode = ‘human’):
这里应该没啥好说的,就是单纯的画图,认真看一下都能明白咋整的
from gym.envs.classic_control import rendering
screen_width = 700
screen_height = 700
if self.viewer is None:
self.viewer = rendering.Viewer(screen_width,screen_height)
# Plot the GridWorld
self.line1 = rendering.Line((100,100),(600,100))
self.line2 = rendering.Line((100, 200), (600, 200))
self.line3 = rendering.Line((100, 300), (600, 300))
self.line4 = rendering.Line((100, 400), (600, 400))
self.line5 = rendering.Line((100, 500), (600, 500))
self.line6 = rendering.Line((100, 600), (600, 600))
self.line7 = rendering.Line((100, 100), (100, 600))
self.line8 = rendering.Line((200, 100), (200, 600))
self.line9 = rendering.Line((300, 100), (300, 600))
self.line10 = rendering.Line((400, 100), (400, 600))
self.line11 = rendering.Line((500, 100), (500, 600))
self.line12 = rendering.Line((600, 100), (600, 600))
# Plot dynamic obstacle_1
self.obs1 = rendering.make_circle(40)
self.obs1trans = rendering.Transform() # translation=(250, 150)
self.obs1.add_attr(self.obs1trans)
self.obs1.set_color(1, 0, 0)
# Plot dynamic obstacle_2
self.obs2 = rendering.make_circle(40)
self.obs2trans = rendering.Transform()
self.obs2.add_attr(self.obs2trans)
self.obs2.set_color(1, 0, 0)
# Plot static obstacle_1
self.obstacle_1 = rendering.make_circle(40)
self.obstacle1trans = rendering.Transform()
self.obstacle_1.add_attr(self.obstacle1trans)
self.obstacle_1.set_color(0, 0, 0)
# Plot static obstacle_2
self.obstacle_2 = rendering.make_circle(40)
self.obstacle2trans = rendering.Transform()
self.obstacle_2.add_attr(self.obstacle2trans)
self.obstacle_2.set_color(0, 0, 0)
# Plot Terminal
self.terminal = rendering.make_circle(40)
self.circletrans = rendering.Transform(translation=(550, 150))
self.terminal.add_attr(self.circletrans)
self.terminal.set_color(0, 0, 1)
# Plot robot
self.robot= rendering.make_circle(30)
self.robotrans = rendering.Transform()
self.robot.add_attr(self.robotrans)
self.robot.set_color(0, 1, 0)
self.line1.set_color(0, 0, 0)
self.line2.set_color(0, 0, 0)
self.line3.set_color(0, 0, 0)
self.line4.set_color(0, 0, 0)
self.line5.set_color(0, 0, 0)
self.line6.set_color(0, 0, 0)
self.line7.set_color(0, 0, 0)
self.line8.set_color(0, 0, 0)
self.line9.set_color(0, 0, 0)
self.line10.set_color(0, 0, 0)
self.line11.set_color(0, 0, 0)
self.line12.set_color(0, 0, 0)
self.viewer.add_geom(self.line1)
self.viewer.add_geom(self.line2)
self.viewer.add_geom(self.line3)
self.viewer.add_geom(self.line4)
self.viewer.add_geom(self.line5)
self.viewer.add_geom(self.line6)
self.viewer.add_geom(self.line7)
self.viewer.add_geom(self.line8)
self.viewer.add_geom(self.line9)
self.viewer.add_geom(self.line10)
self.viewer.add_geom(self.line11)
self.viewer.add_geom(self.line12)
self.viewer.add_geom(self.obs1)
self.viewer.add_geom(self.obs2)
self.viewer.add_geom(self.obstacle_1)
self.viewer.add_geom(self.obstacle_2)
self.viewer.add_geom(self.terminal)
self.viewer.add_geom(self.robot)
if self.state is None:
return None
self.robotrans.set_translation(self.x[self.state], self.y[self.state])
self.obs1trans.set_translation(self.x[self.DynamicObs1], self.y[self.DynamicObs1])
self.obs2trans.set_translation(self.x[self.DynamicObs2], self.y[self.DynamicObs2])
self.obstacle1trans.set_translation(self.x[self.StaticObs1], self.y[self.StaticObs1])
self.obstacle2trans.set_translation(self.x[self.StaticObs2], self.y[self.StaticObs2])
return self.viewer.render(return_rgb_array=mode == 'rgb_array')
环境关闭close(self):
if self.viewer:
self.viewer.close()
self.viewer = None
至此,训练环境相关的设置已经彻底完成!
接下来,训练过程的代码和大佬们的大同小异。
基于Q-learning和Epsilon-greedy训练
值得一提的地方是:在训练过程中,判断本次训练结束后,我们需要在大佬的代码里再添加一个判断:如果不是因为到达终点而终止的训练的话,直接给一个-200.0的reward并更新q_table防止卡在一种死法上 。而更新q_table的步骤在action决策函数里面,故在if Done:下添加:
if observation != 24:
action, state = get_action(state, action, observation, -200.0, episode, 0.5)
以及删除下面一句我也没看懂的episode_reward = -100
其它的和大佬的代码基本一样啦!