欢迎使用Python编程教程学习Carla自动驾驶/自动驾驶汽车的第三部分。在本教程中,我们将利用我们关于Carla API的知识,并尝试将这个问题转化为一个强化学习问题。
在OpenAI率先开发了强化学习环境和解决方案的开源之后,我们终于有了一种接近强化学习环境的标准化方法。
这里的想法是,你的环境将有一个step方法,它将返回:observation, reward, done, extra_info,以及一个reset方法,该方法将基于某种done标志重新启动环境。
我们需要做的就是创建代码来表示这个。我们从Carla的常规进口开始:
import glob
import os
import sys
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
接下来,我们将创建环境的类,我们将其称为CarEnv。在我们的脚本的顶部为模型、代理和环境设置一些常量将是方便的,所以我要做的第一件事是创建我们的第一个常量,将显示预览:
SHOW_PREVIEW = False
现在让我们为我们的环境类设置一些初始值:
class CarEnv:
SHOW_CAM = SHOW_PREVIEW
STEER_AMT = 1.0
im_width = IMG_WIDTH
im_height = IMG_HEIGHT
actor_list = []
front_camera = None
collision_hist = []
我认为这些是相当不言自明的,但是SHOW_CAM是我们是否想要显示预览。为了调试的目的,查看它可能会很有用,但您不一定希望一直显示它,因为执行所有这些操作可能会耗费大量资源。
STEER_AMT是我们想要应用到转向的多少。现在是一个大转弯。之后我们可能会发现,控制的力度要小一些,或许可以做一些累积的事情来代替……等。现在,全速前进吧!
使用collision_hist是因为碰撞传感器报告了事件的历史。基本上,如果这个列表中有什么,我们会说我们崩溃了。现在是init方法。所有你在之前的教程中看到的内容:
def __init__(self):
self.client = carla.Client('localhost', 2000)
self.client.set_timeout(2.0)
# Once we have a client we can retrieve the world that is currently
# running.
self.world = self.client.get_world()
# The world contains the list blueprints that we can use for adding new
# actors into the simulation.
blueprint_library = self.world.get_blueprint_library()
# Now let's filter all the blueprints of type 'vehicle' and choose one
# at random.
#print(blueprint_library.filter('vehicle'))
self.model_3 = blueprint_library.filter('model3')[0]
接下来,我们将创建我们的reset方法。
def reset(self):
self.collision_hist = []
self.actor_list = []
self.transform = random.choice(self.world.get_map().get_spawn_points())
self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
self.actor_list.append(self.vehicle)
self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb')
self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}')
self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}')
self.rgb_cam.set_attribute('fov', '110')
transform = carla.Transform(carla.Location(x=2.5, z=0.7))
self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
self.actor_list.append(self.sensor)
self.sensor.listen(lambda data: self.process_img(data))
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) # initially passing some commands seems to help with time. Not sure why.
这里的一切都是你以前见过的,没有什么新奇的,只是OOP而已。接下来,让我们将碰撞传感器添加到这个方法中:
time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky.
colsensor = self.world.get_blueprint_library().find('sensor.other.collision')
self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
self.actor_list.append(self.colsensor)
self.colsensor.listen(lambda event: self.collision_data(event))
我们在这里先睡4秒钟,因为汽车真的是“掉进”了模拟器。通常,当汽车撞到地面时,我们会收到碰撞记录。而且,最初,这些传感器会花一些时间来初始化和返回值,所以我们只会使用一个安全可靠的四秒。以防需要更长的时间,我们可以这样做:
while self.front_camera is None:
time.sleep(0.01)
我们不能这么做,然而,因为我们需要确定汽车是完成从天空掉落产卵。最后,让我们记录本集的实际启动时间,确保刹车和油门没有被使用,并返回我们的第一个观察结果:
self.episode_start = time.time()
self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0))
return self.front_camera
现在,让我们添加collision_data和process_img方法:
def collision_data(self, event):
self.collision_hist.append(event)
def process_img(self, image):
i = np.array(image.raw_data)
#np.save("iout.npy", i)
i2 = i.reshape((self.im_height, self.im_width, 4))
i3 = i2[:, :, :3]
if self.SHOW_CAM:
cv2.imshow("",i3)
cv2.waitKey(1)
self.front_camera = i3
现在我们需要用步法。这个方法采取一个行动,然后返回观察,奖励,完成,任何额外的信息,就像通常的强化学习范例。开始:
def step(self, action):
'''
For now let's just pass steer left, center, right?
0, 1, 2
'''
if action == 0:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0))
if action == 1:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
if action == 2:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
以上内容展示了我们如何根据传递给我们的数值行动采取行动,现在我们只需要处理观察,可能的碰撞和奖励:
v = self.vehicle.get_velocity()
kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))
if len(self.collision_hist) != 0:
done = True
reward = -200
elif kmh < 50:
done = False
reward = -1
else:
done = False
reward = 1
if self.episode_start + SECONDS_PER_EPISODE < time.time():
done = True
return self.front_camera, reward, done, None
我们正在抓取车辆的速度,从速度转换到每公里时速。我这样做是为了避免探员学习在一个小圆圈里开车。如果我们需要一定的速度来获得奖励,这应该能够抑制这种行为。
接下来,我们检查是否已经耗尽了剧集时间,然后返回所有内容。这样,我们就和我们的环境结束了!
完整的代码:
import glob
import os
import sys
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
SHOW_PREVIEW = False
class CarEnv:
SHOW_CAM = SHOW_PREVIEW
STEER_AMT = 1.0
im_width = IMG_WIDTH
im_height = IMG_HEIGHT
actor_list = []
front_camera = None
collision_hist = []
def __init__(self):
self.client = carla.Client('localhost', 2000)
self.client.set_timeout(2.0)
# Once we have a client we can retrieve the world that is currently
# running.
self.world = self.client.get_world()
# The world contains the list blueprints that we can use for adding new
# actors into the simulation.
blueprint_library = self.world.get_blueprint_library()
# Now let's filter all the blueprints of type 'vehicle' and choose one
# at random.
#print(blueprint_library.filter('vehicle'))
self.model_3 = blueprint_library.filter('model3')[0]
def reset(self):
self.collision_hist = []
self.actor_list = []
self.transform = random.choice(self.world.get_map().get_spawn_points())
self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
self.actor_list.append(self.vehicle)
self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb')
self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}')
self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}')
self.rgb_cam.set_attribute('fov', '110')
transform = carla.Transform(carla.Location(x=2.5, z=0.7))
self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
self.actor_list.append(self.sensor)
self.sensor.listen(lambda data: self.process_img(data))
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky.
colsensor = self.world.get_blueprint_library().find('sensor.other.collision')
self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
self.actor_list.append(self.colsensor)
self.colsensor.listen(lambda event: self.collision_data(event))
while self.front_camera is None:
time.sleep(0.01)
self.episode_start = time.time()
self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0))
return self.front_camera
def collision_data(self, event):
self.collision_hist.append(event)
def process_img(self, image):
i = np.array(image.raw_data)
#np.save("iout.npy", i)
i2 = i.reshape((self.im_height, self.im_width, 4))
i3 = i2[:, :, :3]
if self.SHOW_CAM:
cv2.imshow("",i3)
cv2.waitKey(1)
self.front_camera = i3
def step(self, action):
'''
For now let's just pass steer left, center, right?
0, 1, 2
'''
if action == 0:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0))
if action == 1:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
if action == 2:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
v = self.vehicle.get_velocity()
kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))
if len(self.collision_hist) != 0:
done = True
reward = -200
elif kmh < 50:
done = False
reward = -1
else:
done = False
reward = 1
if self.episode_start + SECONDS_PER_EPISODE < time.time():
done = True
return self.front_camera, reward, done, None
在下一篇教程中,我们将编写我们的Agent类,它将与这个环境交互,并容纳我们实际的强化学习模型。