最近粗略学习了一下D*算法,论文中伪代码如下:
记号
1) G:终点
2) X:表示当前节点,用 b(X)=Y表示 Y是 X的下一个节点,称为后向指针(backpointer),通过后向指针,很容易获得一条从起点到终点的路径。
3) c(X,Y):从 X到 Y的cost,对于不存在边的两个节点, c(X,Y)可定义为无穷大。
4)同 A∗ 一样, D∗ 也用 OPEN表和 CLOSED表来保存待访问和已经访问的节点。对每个节点 X,用 t(X)来表示 X的状态,NEW表示未添加到 OPEN, OPEN表示当前处于 OPEN list, CLOSED表示已从 OPEN list剔除。
5)同 A∗ 一样, D∗ 也有一个对目标的估计函数, h(G,X),不同的是, D∗ 还多了一个key函数,对于在 OPEN list中的每个节点, k(G,X)的定义原文中为“be equal to the minimum of h(G,X) before modification and all valuses assumed by h(G,X) since X was placed on the OPEN list”。也就是说,h函数会随着拓扑结构的变化而变化,key始终是h中最小的那一个。key函数将 OPEN中的节点 X分为两类,一类记为 RAISE,如果 k ( G , X ) < h ( G , X ) ,另一类记为 LOWER,如果 k ( G , X ) = h ( G , X ) 。也就是说, k ( G , X ) ≤ h ( G , X ) ,小于的情况如前方出现障碍物导致某些边的cost增加,等于的情况如前方cost不变或由于障碍物移动导致某些边cost减小。
6) kmin和 kold,前者表示当前 OPEN中 key 的最小值,后者表示上一次的最小值。
7)如果一系列节点 {X_1,X_2,...,X_N} 满足 b(Xi+1)=Xi,则称为一个序列(sequence)。一个序列表示了一条从 XN到 X1的路径。在静态路径规划中,我们通常找的是一条从起点到终点的路径中,但是在动态路径规划中,起点是不断变化的,严格来讲应该是找寻一条从当前节点到终点的路径。显然这种情况下,考虑从终点到当前节点更为方便,故这里 X1反而是最靠近终点的节点。称一个序列为单调(monotonic)的如果满足 t ( X i ) = C L O S E D a n d h ( G , X i ) < h ( G , X i + 1 ) 或者 t ( X i ) = OPEN a n d k ( G , X i ) < h ( G , X i + 1 )。杂乱无的序列是没有意义的,单调的序列其实就是我们要找的一条合理的路径。
8)对 Xi,Xj,如果 i < j 则称 Xi 是 Xj 的祖先(ancestor),反之则称为后代(descendant)。
9)如果记号涉及两个节点且其中一个为终点,略去终点,如 f(X)=f(G,X)
算法描述
D∗主要包括两个部分,PROCESS−STATE 和 MODIFY−COST,前者用来计算终点到当前节点的最优cost,后者用来修正cost。主要流程如下:
1)将所有节点的 tag设置为 NEW, h(G)设为0,将G放置在 OPEN中。
2)重复调用 PROCESS−STATE直到当前节点 X从 OPEN中移除,即 t(X)=CLOSED,说明此时已经找到一条从X出发到达终点的路径。
3)根据2)中获得的路径,从节点 X往后移动,直到达到终点或者检测到cost发送变化。
4)当cost发生变化时,调用第二个函数 MODIFY−COST,并将因为障碍物而cost受到影响的节点重新放入 OPEN中。假设 Y是发现状态时机器人所处的节点。通过调用 PROCESS−STATE直到 kmin≥h(Y),此时cost的变化已经传播到 Y,因此可以找到一条新的从 Y到终点的最优路径。
代码实现
本文给出基于自己理解编写的Python实例,感觉对算法主体部分理解还不是很透彻,先做此记录。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/07/23
# @Author : 小仙女
# @Site :
# @File : D_star.py
# @Software: PyCharm
import math
from decimal import Decimal
class State(object):
def __init__(self, x, y):
self.x = x
self.y = y
self.parent = None
self.state = "."
self.t = "new"
self.h = 0
self.k = 0
def cost(self, state):
if self.state == "#" or state.state == "#":
return 100000 # 存在障碍物时,距离无穷大
return math.sqrt(math.pow((self.x - state.x), 2) +
math.pow((self.y - state.y), 2))
def set_state(self, state):
if state not in [".", "#", "S", "E", "+", "*", "&"]:
return
self.state = state
class Map(object):
'''
创建地图
'''
def __init__(self, row, col):
self.row = row
self.col = col
self.map = self.init_map()
def init_map(self):
# 初始化map
map_list = []
for i in range(self.row):
tmp = []
for j in range(self.col):
tmp.append(State(i, j))
map_list.append(tmp)
return map_list
def print_path(self):
for i in range(self.row):
tmp = ""
for j in range(self.col):
tmp += self.map[i][j].state + " "
print(tmp)
def print_state(self):
print("--------------------------------------------------------------------------------------------")
for i in range(self.row):
tmp_h = "| "
tmp_k = "| "
tmp_b = "| "
for j in range(self.col):
if self.map[i][j].h > 10000:
h = "inf "
else:
if self.map[i][j].h > 10:
h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.0')))
else:
h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.00')))
if self.map[i][j].k > 10000:
k = "inf "
else:
if self.map[i][j].k > 10:
k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.0')))
else:
k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.00')))
if self.map[i][j].parent:
parent = "(" + str(self.map[i][j].parent.x) + "," + str(self.map[i][j].parent.y) + ")"
else:
parent = "None "
tmp_h += "h:" + h + " | "
tmp_k += "k:" + k + " | "
tmp_b += "b:" + parent + " | "
print(tmp_h)
print(tmp_k)
print(tmp_b)
print("--------------------------------------------------------------------------------------------")
def get_neighbers(self, state):
# 获取8邻域
state_list = []
for i in [-1, 0, 1]:
for j in [-1, 0, 1]:
if i == 0 and j == 0:
continue
if state.x + i < 0 or state.x + i >= self.row:
continue
if state.y + j < 0 or state.y + j >= self.col:
continue
state_list.append(self.map[state.x + i][state.y + j])
return state_list
def set_obstacle(self, point_list):
# 设置障碍物的位置
for x, y in point_list:
if x < 0 or x >= self.row or y < 0 or y >= self.col:
continue
self.map[x][y].set_state("#")
def remove_obstacle(self, point_list):
# 移除障碍物
remove = set()
for x, y in point_list:
if x < 0 or x >= self.row or y < 0 or y >= self.col or self.map[x][y].state != "#":
continue
self.map[x][y].set_state(".")
for s in self.get_neighbers(self.map[x][y]):
if self.map[x][y].h > s.cost(self.map[x][y]) + s.h:
s.parent = self.map[x][y]
self.map[x][y].h = s.cost(self.map[x][y]) + s.h
remove.add(self.map[x][y])
return remove
def clear_path(self):
for i in range(self.row):
for j in range(self.col):
if self.map[i][j].state != "." and self.map[i][j].state != "#":
self.map[i][j].state = "."
class Dstar(object):
def __init__(self, maps):
self.map = maps
self.open_list = set() # 创建空集合
def process_state(self):
'''
D*算法的主要过程
:return:
'''
x = self.min_state() # 获取open list列表中最小k的节点
if x is None:
return -1
k_old = self.get_kmin() #获取open list列表中最小k节点的k值
self.remove(x) # 从openlist中移除
# print("")
# print('当前节点')
# print(x.x,x.y)
if k_old < x.h:
for y in self.map.get_neighbers(x):
if y.h <= k_old and x.h > y.h + x.cost(y):
# if y.t == "close" and x.h > y.h + x.cost(y):
x.parent = y
x.h = y.h + x.cost(y)
# print('修改x父节点降低h值')
# print(y.x,y.y)
if k_old == x.h:
for y in self.map.get_neighbers(x):
if (y.t == "new" or y.parent == x and y.h != x.h + x.cost(y) \
or y.parent != x and y.h > x.h + x.cost(y)) and y != end:
# if y.t == "new" and y != end:
# print('探索到新节点')
# print(y.x,y.y)
# if y.parent == x and y.h != x.h + x.cost(y) and y != end:
# print('传递节点x代价至y')
# print(y.x,y.y)
# if y.parent != x and y.h > x.h + x.cost(y) and y != end:
# print('修改y父节点降低h值')
# print(y.x,y.y)
y.parent = x
self.insert(y, x.h + x.cost(y))
else:
for y in self.map.get_neighbers(x):
if y.t == "new" or y.parent == x and y.h != x.h + x.cost(y):
y.parent = x
# print('新建/更新y父节点为x')
# print(y.x,y.y)
self.insert(y, x.h + x.cost(y))
else:
if y.parent != x and y.h > x.h + x.cost(y):
# print('插入x节点准备重新扩展')
# print(x.x,x.y)
self.insert(x, x.h)
else:
if y.parent != x and x.h > y.h + x.cost(y) \
and y.t == "close" and y.h > k_old:
# print('插入y节点准备重新扩展')
# print(y.x,y.y)
self.insert(y, y.h)
# print("")
return self.get_kmin()
def min_state(self):
if not self.open_list:
return None
# 获取openlist中k值最小对应的节点
min_state = min(self.open_list, key=lambda x: x.k)
return min_state
def get_kmin(self):
# 获取openlist表中最小的k
if not self.open_list:
return -1
k_min = min([x.k for x in self.open_list])
return k_min
def insert(self, state, h_new):
if state.t == "new":
state.k = h_new
elif state.t == "open":
state.k = min(state.k, h_new)
elif state.t == "close":
state.k = min(state.h, h_new)
state.h = h_new
state.t = "open"
self.open_list.add(state)
def remove(self, state):
if state.t == "open":
state.t = "close"
self.open_list.remove(state)
def modify_cost(self, x):
if x.t == "close":
self.insert(x, x.parent.h + x.cost(x.parent))
def run(self, start, end):
self.open_list.add(end)
while True:
if self.process_state() == -1:
break
if start.t == "close":
break
if start.t == "close":
start.set_state("S")
tmp = start.parent
while tmp != end:
tmp.set_state("+")
tmp = tmp.parent
tmp.set_state("E")
print('障碍物未发生变化时,搜索的路径如下:')
self.map.print_path()
# print('障碍物未发生变化时,代价地图如下:')
# self.map.print_state()
print("")
self.map.clear_path()
self.map.set_obstacle([(2, 3), (3, 3), [5, 3]]) # 障碍物发生变化 1
remove = self.map.remove_obstacle([(3, 2), (2, 1), (1, 2)])
# self.map.set_obstacle([(3, 3)]) # 障碍物发生变化 2
# remove = self.map.remove_obstacle([])
# self.map.set_obstacle([(3, 3), (2, 9)]) # 障碍物发生变化 3
# remove = self.map.remove_obstacle([])
for s in remove:
self.insert(s, s.h)
if start.t == "close":
tmp = start
while tmp != end and tmp != None:
tmp.set_state("*")
if tmp.parent.state == "#":
self.modify(tmp.parent)
print("探测到障碍物,重新规划")
if tmp.parent.state == "#":
break
continue
tmp = tmp.parent
start.set_state("S")
tmp.set_state("E")
if tmp.x == end.x and tmp.y == end.y:
print('障碍物发生变化时,搜索的路径如下:')
else:
print('[error] 动态寻路失败')
self.map.print_path()
# print('障碍物发生变化时,代价地图如下:')
# self.map.print_state()
def modify(self, state):
self.modify_cost(state)
while True:
k_min = self.process_state()
if k_min == -1 or k_min>10000:
break
if k_min >= state.h:
break
if __name__ == '__main__':
m = Map(6, 15)
m.set_obstacle([(0, 1), (0, 2), (1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (4, 3)])
start = m.map[5][1]
end = m.map[0][14]
dstar = Dstar(m)
dstar.run(start, end)