D*算法Python实例

        最近粗略学习了一下D*算法,论文中伪代码如下:

D*算法Python实例

记号

        1)  G:终点
        2)  X:表示当前节点,用 b(X)=Y表示  Y是 X的下一个节点,称为后向指针(backpointer),通过后向指针,很容易获得一条从起点到终点的路径。
        3) c(X,Y):从 X到 Y的cost,对于不存在边的两个节点, c(X,Y)可定义为无穷大。
        4)同 A∗ 一样, D∗ 也用 OPEN表和 CLOSED表来保存待访问和已经访问的节点。对每个节点 X,用 t(X)来表示 X的状态,NEW表示未添加到 OPEN, OPEN表示当前处于 OPEN list, CLOSED表示已从 OPEN list剔除。
        5)同 A∗ 一样, D∗ 也有一个对目标的估计函数, h(G,X),不同的是, D∗ 还多了一个key函数,对于在 OPEN list中的每个节点, k(G,X)的定义原文中为“be equal to the minimum of  h(G,X) before modification and all valuses assumed by h(G,X) since X was placed on the OPEN list”。也就是说,h函数会随着拓扑结构的变化而变化,key始终是h中最小的那一个。key函数将 OPEN中的节点 X分为两类,一类记为 RAISE,如果 k ( G , X ) < h ( G , X ) ,另一类记为 LOWER,如果 k ( G , X ) = h ( G , X ) 。也就是说, k ( G , X ) ≤ h ( G , X ) ,小于的情况如前方出现障碍物导致某些边的cost增加,等于的情况如前方cost不变或由于障碍物移动导致某些边cost减小。
        6) kmin​和 kold​,前者表示当前 OPEN中 key 的最小值,后者表示上一次的最小值。
        7)如果一系列节点 {X_1,X_2,...,X_N} ​满足 b(Xi+1​)=Xi​,则称为一个序列(sequence)。一个序列表示了一条从 XN​到 X1​的路径。在静态路径规划中,我们通常找的是一条从起点到终点的路径中,但是在动态路径规划中,起点是不断变化的,严格来讲应该是找寻一条从当前节点到终点的路径。显然这种情况下,考虑从终点到当前节点更为方便,故这里 X1​反而是最靠近终点的节点。称一个序列为单调(monotonic)的如果满足 t ( X i ) = C L O S E D   a n d   h ( G , X i ) < h ( G , X i + 1 ) 或者 t ( X i ) = OPEN   a n d   k ( G , X i ) < h ( G , X i + 1 )。杂乱无的序列是没有意义的,单调的序列其实就是我们要找的一条合理的路径。
        8)对 Xi​,Xj​,如果 i < j 则称 Xi​ 是 Xj​ 的祖先(ancestor),反之则称为后代(descendant)。
        9)如果记号涉及两个节点且其中一个为终点,略去终点,如 f(X)=f(G,X)

 

算法描述

        D∗主要包括两个部分,PROCESS−STATE 和 MODIFY−COST,前者用来计算终点到当前节点的最优cost,后者用来修正cost。主要流程如下:
        1)将所有节点的 tag设置为 NEW, h(G)设为0,将G放置在 OPEN中。
        2)重复调用 PROCESS−STATE直到当前节点 X从 OPEN中移除,即 t(X)=CLOSED,说明此时已经找到一条从X出发到达终点的路径。
        3)根据2)中获得的路径,从节点 X往后移动,直到达到终点或者检测到cost发送变化。
        4)当cost发生变化时,调用第二个函数 MODIFY−COST,并将因为障碍物而cost受到影响的节点重新放入 OPEN中。假设 Y是发现状态时机器人所处的节点。通过调用 PROCESS−STATE直到 kmin​≥h(Y),此时cost的变化已经传播到 Y,因此可以找到一条新的从 Y到终点的最优路径。

代码实现

        本文给出基于自己理解编写的Python实例,感觉对算法主体部分理解还不是很透彻,先做此记录。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2021/07/23
# @Author  : 小仙女
# @Site    :
# @File    : D_star.py
# @Software: PyCharm
 
import math
from decimal import Decimal

 
class State(object):
 
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None
        self.state = "."
        self.t = "new"
        self.h = 0
        self.k = 0
 
    def cost(self, state):
        if self.state == "#" or state.state == "#":
            return 100000  # 存在障碍物时,距离无穷大
        return math.sqrt(math.pow((self.x - state.x), 2) +
                         math.pow((self.y - state.y), 2))
 
    def set_state(self, state):
        if state not in [".", "#", "S", "E", "+", "*", "&"]:
            return
        self.state = state
 
 
class Map(object):
    '''
    创建地图
    '''
    def __init__(self, row, col):
        self.row = row
        self.col = col
        self.map = self.init_map()
 
    def init_map(self):
        # 初始化map
        map_list = []
        for i in range(self.row):
            tmp = []
            for j in range(self.col):
                tmp.append(State(i, j))
            map_list.append(tmp)
        return map_list
 
    def print_path(self):
        for i in range(self.row):
            tmp = ""
            for j in range(self.col):
                tmp += self.map[i][j].state + " "
            print(tmp)

    def print_state(self):
        print("--------------------------------------------------------------------------------------------")
        for i in range(self.row):
            tmp_h = "|   "
            tmp_k = "|   "
            tmp_b = "|   "
            for j in range(self.col):
                if self.map[i][j].h > 10000:
                    h = "inf "
                else:
                    if self.map[i][j].h > 10:
                        h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.0')))
                    else:
                        h = str(Decimal(self.map[i][j].h).quantize(Decimal('0.00')))       
                if self.map[i][j].k > 10000:
                    k = "inf "
                else:
                    if self.map[i][j].k > 10:
                        k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.0')))
                    else:
                        k = str(Decimal(self.map[i][j].k).quantize(Decimal('0.00')))   
                if self.map[i][j].parent:
                    parent = "(" + str(self.map[i][j].parent.x) + "," + str(self.map[i][j].parent.y) + ")"
                else:
                    parent = "None "
                tmp_h += "h:" + h + "   |   "
                tmp_k += "k:" + k + "   |   "
                tmp_b += "b:" + parent + "  |   "
            print(tmp_h)
            print(tmp_k)
            print(tmp_b)
            print("--------------------------------------------------------------------------------------------")
 
    def get_neighbers(self, state):
        # 获取8邻域
        state_list = []
        for i in [-1, 0, 1]:
            for j in [-1, 0, 1]:
                if i == 0 and j == 0:
                    continue
                if state.x + i < 0 or state.x + i >= self.row:
                    continue
                if state.y + j < 0 or state.y + j >= self.col:
                    continue
                state_list.append(self.map[state.x + i][state.y + j])
        return state_list
 
    def set_obstacle(self, point_list):
        # 设置障碍物的位置
        for x, y in point_list:
            if x < 0 or x >= self.row or y < 0 or y >= self.col:
                continue
            self.map[x][y].set_state("#")

    def remove_obstacle(self, point_list):
        # 移除障碍物
        remove = set()
        for x, y in point_list:
            if x < 0 or x >= self.row or y < 0 or y >= self.col or self.map[x][y].state != "#":
                continue
            self.map[x][y].set_state(".")
            for s in self.get_neighbers(self.map[x][y]):
                if self.map[x][y].h > s.cost(self.map[x][y]) + s.h:
                    s.parent = self.map[x][y]
                    self.map[x][y].h = s.cost(self.map[x][y]) + s.h
            remove.add(self.map[x][y])
        return remove


    def clear_path(self):
        for i in range(self.row):
            for j in range(self.col):
                if self.map[i][j].state != "." and self.map[i][j].state != "#":
                    self.map[i][j].state = "."

 
class Dstar(object):
 
    def __init__(self, maps):
        self.map = maps
        self.open_list = set()  # 创建空集合
 
    def process_state(self):
        '''
        D*算法的主要过程
        :return:
        '''
        x = self.min_state()    # 获取open list列表中最小k的节点
        if x is None:
            return -1
        k_old = self.get_kmin() #获取open list列表中最小k节点的k值
        self.remove(x)  # 从openlist中移除
        # print("")
        # print('当前节点')
        # print(x.x,x.y)
        if k_old < x.h:
            for y in self.map.get_neighbers(x):
                if y.h <= k_old and x.h > y.h + x.cost(y):
                # if y.t == "close" and x.h > y.h + x.cost(y):
                    x.parent = y
                    x.h = y.h + x.cost(y)
                    # print('修改x父节点降低h值')
                    # print(y.x,y.y)

        if k_old == x.h:
            for y in self.map.get_neighbers(x):
                if (y.t == "new" or y.parent == x and y.h != x.h + x.cost(y) \
                        or y.parent != x and y.h > x.h + x.cost(y)) and y != end:
                    # if y.t == "new" and y != end:
                        # print('探索到新节点')
                        # print(y.x,y.y)
                    # if y.parent == x and y.h != x.h + x.cost(y) and y != end:
                        # print('传递节点x代价至y')
                        # print(y.x,y.y)
                    # if y.parent != x and y.h > x.h + x.cost(y) and y != end:
                        # print('修改y父节点降低h值')
                        # print(y.x,y.y)
                    y.parent = x
                    self.insert(y, x.h + x.cost(y))
        else:
            for y in self.map.get_neighbers(x):
                if y.t == "new" or y.parent == x and y.h != x.h + x.cost(y):
                    y.parent = x
                    # print('新建/更新y父节点为x')
                    # print(y.x,y.y)
                    self.insert(y, x.h + x.cost(y))
                else:
                    if y.parent != x and y.h > x.h + x.cost(y):
                        # print('插入x节点准备重新扩展')
                        # print(x.x,x.y)
                        self.insert(x, x.h)
                    else:
                        if y.parent != x and x.h > y.h + x.cost(y) \
                                and y.t == "close" and y.h > k_old:
                            # print('插入y节点准备重新扩展')
                            # print(y.x,y.y)
                            self.insert(y, y.h)
        # print("")
        return self.get_kmin()
 
    def min_state(self):
        if not self.open_list:
            return None
        # 获取openlist中k值最小对应的节点
        min_state = min(self.open_list, key=lambda x: x.k)
        return min_state
 
    def get_kmin(self):
        # 获取openlist表中最小的k
        if not self.open_list:
            return -1
        k_min = min([x.k for x in self.open_list])
        return k_min
 
    def insert(self, state, h_new):
        if state.t == "new":
            state.k = h_new
        elif state.t == "open":
            state.k = min(state.k, h_new)
        elif state.t == "close":
            state.k = min(state.h, h_new)
        state.h = h_new
        state.t = "open"
        self.open_list.add(state)
 
    def remove(self, state):
        if state.t == "open":
            state.t = "close"
        self.open_list.remove(state)
 
    def modify_cost(self, x):
        if x.t == "close":
            self.insert(x, x.parent.h + x.cost(x.parent))
 
    def run(self, start, end):

        self.open_list.add(end)
        while True:
            if self.process_state() == -1:
                break
            if start.t == "close":
                break
        if start.t == "close":
            start.set_state("S")
            tmp = start.parent
            while tmp != end:
                tmp.set_state("+")
                tmp = tmp.parent
            tmp.set_state("E")
            print('障碍物未发生变化时,搜索的路径如下:')
            self.map.print_path()
            # print('障碍物未发生变化时,代价地图如下:')
            # self.map.print_state()
        
        print("")
        self.map.clear_path()
        self.map.set_obstacle([(2, 3), (3, 3), [5, 3]]) # 障碍物发生变化 1
        remove = self.map.remove_obstacle([(3, 2), (2, 1), (1, 2)])
        # self.map.set_obstacle([(3, 3)]) # 障碍物发生变化 2
        # remove = self.map.remove_obstacle([])
        # self.map.set_obstacle([(3, 3), (2, 9)]) # 障碍物发生变化 3
        # remove = self.map.remove_obstacle([])
        for s in remove:
            self.insert(s, s.h)
        if start.t == "close":
            tmp = start
            while tmp != end and tmp != None:
                tmp.set_state("*")
                if tmp.parent.state == "#":
                    self.modify(tmp.parent)
                    print("探测到障碍物,重新规划")
                    if tmp.parent.state == "#":
                        break
                    continue
                tmp = tmp.parent
            start.set_state("S")
            tmp.set_state("E")
            if tmp.x == end.x and tmp.y == end.y:
                print('障碍物发生变化时,搜索的路径如下:')
            else:
                print('[error] 动态寻路失败')
            self.map.print_path()
            # print('障碍物发生变化时,代价地图如下:')
            # self.map.print_state()
 
    def modify(self, state):
        self.modify_cost(state)
        while True:
            k_min = self.process_state()
            if k_min == -1 or k_min>10000:
                break
            if k_min >= state.h:
                break
 
 
if __name__ == '__main__':
    m = Map(6, 15)
    m.set_obstacle([(0, 1), (0, 2), (1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (4, 3)])
    start = m.map[5][1]
    end = m.map[0][14]
    dstar = Dstar(m)
    dstar.run(start, end)

 

上一篇:简·六级听力基础篇1


下一篇:文档分节