强化学习实现四旋翼无人机的路径选择(Gym+Gym-Gazebo+Gazebo+Ros)

1、Gym+Gym-Gazebo+Gazebo+Ros的安装

2、环境配置:

      创建Ros工作空间,并且source devel/setup.bash

3、 编写launch文件,并将launch文件放在.../gym_gazebo/assets/目录下

<launch>
   <arg name="world_file"  default="$(find aircraft_start)/worlds/aircraft_wall.world"/>
   <include file="$(find gazebo_ros)/launch/empty_world.launch">
      <arg name="use_sim_time" value="true"/>
      <arg name="paused" value="true"/>
      <arg name="gui" value="true"/>
      <arg name="world_name" value="$(arg world_file)"/>
      <arg name="debug" value="false"/>
  </include>
  <group ns="aircraft_1">
    <arg name="model" value="$(find hector_quadrotor_description)/urdf/quadrotor_with_cam1.gazebo.xacro"/>
    <include file="$(find hector_quadrotor_gazebo)/launch/spawn_quadrotor.launch">
      <arg name="name" value="aircraft_1" />
      <arg name="tf_prefix" value="aircraft_1" />
      <arg name="model" value="$(arg model)" />
      <arg name="x" value="-2.5" />
      <arg name="y" value="-2.5" />
      <arg name="z" value ="1.5" />  
    </include>
  </group>
   <node name = "aircraft_up" pkg = "aircraft_start" type="aircraft_up" output="screen"/>
</launch>

4、编写aircraft_wall(在env目录下面自建)功能包里面的__init__.py 和aircraft_wall_env.py

#aircraft_wall_env.py文件
#!/usr/bin/env python
import gym
from gym import wrappers
#from gym import wrappers
import gym_gazebo
import time
import numpy
import random
import time

import qlearn
import liveplot

def render():
    render_skip = 0 #Skip first X episodes.
    render_interval = 50 #Show render Every Y episodes.
    render_episodes = 10 #Show Z episodes every rendering.

    if (x%render_interval == 0) and (x != 0) and (x > render_skip):
        env.render()
    elif ((x-render_episodes)%render_interval == 0) and (x != 0) and (x > render_skip) and (render_episodes < x):
        env.render(close=True)

if __name__ == '__main__':

    env = gym.make('aircraftwall-v0')  
    outdir = '/tmp/gazebo_gym_experiments'
    env = gym.wrappers.Monitor(env, outdir, force=True)
    plotter = liveplot.LivePlot(outdir)
    last_time_steps = numpy.ndarray(0)
    qlearn = qlearn.QLearn(actions=range(env.action_space.n),alpha=0.2, gamma=0.8, epsilon=0.9)
    initial_epsilon = qlearn.epsilon
    epsilon_discount = 0.9986
    
    start_time = time.time()
    total_episodes = 10000
    highest_reward = 0
    
    for x in range(total_episodes):
        done = False

        cumulated_reward = 0 #Should going forward give more reward then L/R ?

        observation = env.reset()

        if qlearn.epsilon > 0.05:
            qlearn.epsilon *= epsilon_discount

        #render() #defined above, not env.render()

        state = ''.join(map(str, observation))

        for i in range(1500):

            # Pick an action based on the current state
            action = qlearn.chooseAction(state)

            # Execute the action and get feedback
            # observation,reward,done,info = env,step(action)
            observation, reward, done, info = env.step(action)
            cumulated_reward += reward

            if highest_reward < cumulated_reward:
                highest_reward = cumulated_reward

            nextState = ''.join(map(str, observation))

            qlearn.learn(state, action, reward, nextState)

            env._flush(force=True)

            if not(done):
                state = nextState
            else:
                last_time_steps = numpy.append(last_time_steps, [int(i + 1)])
                break

        # if x%100==0:
        #     plotter.plot(env)

        m, s = divmod(int(time.time() - start_time), 60)
        h, m = divmod(m, 60)
        # print ("EP: "+str(x+1)+" - [alpha: "+str(round(qlearn.alpha,2))+" - gamma: "+str(round(qlearn.gamma,2))+" - epsilon: "+str(round(qlearn.epsilon,2))+"] - Reward: "+str(cumulated_reward)+"     Time: %d:%02d:%02d" % (h, m, s))

    #Github table content
    # print ("\n|"+str(total_episodes)+"|"+str(qlearn.alpha)+"|"+str(qlearn.gamma)+"|"+str(initial_epsilon)+"*"+str(epsilon_discount)+"|"+str(highest_reward)+"| PICTURE |")

    l = last_time_steps.tolist()
    l.sort()

    #print("Parameters: a="+str)
    # print("Overall score: {:0.2f}".format(last_time_steps.mean()))
    # print("Best 100 score: {:0.2f}".format(reduce(lambda x, y: x + y, l[-100:]) / len(l[-100:])))

    env.close()
#__init__.py文件
from gym_gazebo.envs.aircraft_wall.aircraft_wall_env import AircraftWallEnv

5、编写aircraft_wall(在../gym/example自建)的aircraft_wall.py和qlearn.py

#qlearn.py文件
import random

class QLearn:
    def __init__(self, actions, epsilon, alpha, gamma):
        self.q = {}
        self.epsilon = epsilon  # exploration constant
        self.alpha = alpha      # discount constant
        self.gamma = gamma      # discount factor
        self.actions = actions

    def getQ(self, state, action):
        return self.q.get((state, action), 0.0)

    #change Q_table is value
    def learnQ(self, state, action, reward, value):
        '''
        Q-learning:
            Q(s, a) += alpha * (reward(s,a) + max(Q(s') - Q(s,a))            
        '''
        oldv = self.q.get((state, action), None)
        if oldv is None:
            self.q[(state, action)] = reward
        else:
            self.q[(state, action)] = oldv + self.alpha * (value - oldv)

    #while some state to choose one action,Q_value is max value
    def chooseAction(self, state, return_q=False):
        q = [self.getQ(state, a) for a in self.actions]
        maxQ = max(q)

        if random.random() < self.epsilon:
            minQ = min(q); mag = max(abs(minQ), abs(maxQ))
            # add random values to all the actions, recalculate maxQ
            q = [q[i] + random.random() * mag - .5 * mag for i in range(len(self.actions))] 
            maxQ = max(q)

        count = q.count(maxQ)
        # In case there're several state-action max values 
        # we select a random one among them
        if count > 1:
            best = [i for i in range(len(self.actions)) if q[i] == maxQ]
            i = random.choice(best)
        else:
            i = q.index(maxQ)

        action = self.actions[i]        
        if return_q: # if they want it, give it!
            return action, q
        return action

    def learn(self, state1, action1, reward, state2):
        maxqnew = max([self.getQ(state2, a) for a in self.actions])
        self.learnQ(state1, action1, reward, reward + self.gamma*maxqnew)

#aircraft_wall.py
#!/usr/bin/env python
import gym
from gym import wrappers
#from gym import wrappers
import gym_gazebo
import time
import numpy
import random
import time

import qlearn
import liveplot

def render():
    render_skip = 0 #Skip first X episodes.
    render_interval = 50 #Show render Every Y episodes.
    render_episodes = 10 #Show Z episodes every rendering.

    if (x%render_interval == 0) and (x != 0) and (x > render_skip):
        env.render()
    elif ((x-render_episodes)%render_interval == 0) and (x != 0) and (x > render_skip) and (render_episodes < x):
        env.render(close=True)

if __name__ == '__main__':

    env = gym.make('aircraftwall-v0')  
    outdir = '/tmp/gazebo_gym_experiments'
    env = gym.wrappers.Monitor(env, outdir, force=True)
    plotter = liveplot.LivePlot(outdir)
    last_time_steps = numpy.ndarray(0)
    qlearn = qlearn.QLearn(actions=range(env.action_space.n),alpha=0.2, gamma=0.8, epsilon=0.9)
    initial_epsilon = qlearn.epsilon
    epsilon_discount = 0.9986
    
    start_time = time.time()
    total_episodes = 10000
    highest_reward = 0
    
    for x in range(total_episodes):
        done = False

        cumulated_reward = 0 #Should going forward give more reward then L/R ?

        observation = env.reset()

        if qlearn.epsilon > 0.05:
            qlearn.epsilon *= epsilon_discount

        #render() #defined above, not env.render()

        state = ''.join(map(str, observation))

        for i in range(1500):

            # Pick an action based on the current state
            action = qlearn.chooseAction(state)

            # Execute the action and get feedback
            # observation,reward,done,info = env,step(action)
            observation, reward, done, info = env.step(action)
            cumulated_reward += reward

            if highest_reward < cumulated_reward:
                highest_reward = cumulated_reward

            nextState = ''.join(map(str, observation))

            qlearn.learn(state, action, reward, nextState)

            env._flush(force=True)

            if not(done):
                state = nextState
            else:
                last_time_steps = numpy.append(last_time_steps, [int(i + 1)])
                break

        # if x%100==0:
        #     plotter.plot(env)

        m, s = divmod(int(time.time() - start_time), 60)
        h, m = divmod(m, 60)
        # print ("EP: "+str(x+1)+" - [alpha: "+str(round(qlearn.alpha,2))+" - gamma: "+str(round(qlearn.gamma,2))+" - epsilon: "+str(round(qlearn.epsilon,2))+"] - Reward: "+str(cumulated_reward)+"     Time: %d:%02d:%02d" % (h, m, s))

    #Github table content
    # print ("\n|"+str(total_episodes)+"|"+str(qlearn.alpha)+"|"+str(qlearn.gamma)+"|"+str(initial_epsilon)+"*"+str(epsilon_discount)+"|"+str(highest_reward)+"| PICTURE |")

    l = last_time_steps.tolist()
    l.sort()

    #print("Parameters: a="+str)
    # print("Overall score: {:0.2f}".format(last_time_steps.mean()))
    # print("Best 100 score: {:0.2f}".format(reduce(lambda x, y: x + y, l[-100:]) / len(l[-100:])))

    env.close()

6、在gym_gazebo下面的__init__.py文件中添加:


register(
    id='aircraftwall-v0',
    entry_point='gym_gazebo.envs.aircraft_wall:AircraftWallEnv',
)

7、source 工作空间,python aircraft_wall.py 运行(记得chmod +x aircraft_wall.py)

上一篇:漏洞学习笔记-019-利用HeapSpray攻击ASLR


下一篇:《从机器学习到深度学习》笔记(3)强化学习