一、ROS+OpenCV图像处理
OpenCV是跨平台的的开源计算机视觉库。ROS中使用OpenCV时,需要注意图像数据的转换。cv_bridge_test.py
脚本文件可以实现该转换功能,文件内容如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import cv2
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image
class image_converter:
def __init__(self):
# 创建cv_bridge,声明图像的发布者和订阅者
self.image_pub = rospy.Publisher("cv_bridge_image", Image, queue_size=1)
self.bridge = CvBridge()
self.image_sub = rospy.Subscriber("/usb_cam/image_raw", Image, self.callback)
def callback(self,data):
# 使用cv_bridge将ROS的图像数据转换成OpenCV的图像格式
try:
cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
except CvBridgeError as e:
print e
# 在opencv的显示窗口中绘制一个圆,作为标记
(rows,cols,channels) = cv_image.shape
if cols > 60 and rows > 60 :
cv2.circle(cv_image, (60, 60), 30, (0,0,255), -1)
# 显示Opencv格式的图像
cv2.imshow("Image window", cv_image)
cv2.waitKey(3)
# 再将opencv格式额数据转换成ros image格式的数据发布
try:
self.image_pub.publish(self.bridge.cv2_to_imgmsg(cv_image, "bgr8"))
except CvBridgeError as e:
print e
if __name__ == '__main__':
try:
# 初始化ros节点
rospy.init_node("cv_bridge_test")
rospy.loginfo("Starting cv_bridge_test node")
image_converter()
rospy.spin()
except KeyboardInterrupt:
print "Shutting down cv_bridge_test node."
cv2.destroyAllWindows()
imgmsg_to_cv2()
:ROS图像数据格式转换成OpenCV图像格式。cv2_to_imgmsg()
:OpenCV图像数据格式转换成ROS图像格式。
二、人脸识别
通过人脸识别的方式实现以下场景:
- 通过人脸识别的方式,发布速度控制命令,控制仿真机器人运动。例如:人脸向左移动,小车向左转,人脸向前移动,小车前进。
2.1 思路
物体的识别后会得到物体在图片中的中心点(x, y)和识别物体大小的矩形宽(w)和高(h),我们可以借助(x, y)处于图片的什么区域对小车转向进行控制,而识别物体的矩形框的大小可以控制小车的前进后退。
假设摄像机采集的图片分辨率为\(W \times H\),则有小车的控制规则如下:
if ( w > W/2 and h > H/2):
小车前进
else:
小车后退
if (x > W/2):
小车右转
else:
小车左转
2.2 基于检测人脸控制小车
roslaunch mbot_gazebo view_mbot_gazebo_empty_world_automobile.launch #打开小车模型
roslaunch robot_vision usb_cam.launch #打开摄像头
roslaunch robot_vison face_detector.launch #基于人脸检测控制小车
rqt_image_view
其中usb_cam.launch
内容如下:
<launch>
<node name="usb_cam" pkg="usb_cam" type="usb_cam_node" output="screen" >
<param name="video_device" value="/dev/video0" />
<param name="image_width" value="640" />
<param name="image_height" value="480" />
<param name="pixel_format" value="yuyv" />
<param name="camera_frame_id" value="usb_cam" />
<param name="io_method" value="mmap"/>
</node>
</launch>
face_detector.launch
如下:
<launch>
<node pkg="robot_vision" name="face_detector" type="face_detector.py" output="screen">
<remap from="input_rgb_image" to="/usb_cam/image_raw" />
<rosparam>
haar_scaleFactor: 1.2
haar_minNeighbors: 2
haar_minSize: 40
haar_maxSize: 60
</rosparam>
<param name="cascade_1" value="$(find robot_vision)/data/haar_detectors/haarcascade_frontalface_alt.xml" />
<param name="cascade_2" value="$(find robot_vision)/data/haar_detectors/haarcascade_profileface.xml" />
</node>
</launch>
核心算法脚本文件face_detector.py
,其内容如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import cv2
import numpy as np
from sensor_msgs.msg import Image, RegionOfInterest
from cv_bridge import CvBridge, CvBridgeError
from geometry_msgs.msg import Twist
class faceDetector:
def __init__(self):
rospy.on_shutdown(self.cleanup);
# 创建cv_bridge
self.bridge = CvBridge()
self.image_pub = rospy.Publisher("cv_bridge_image", Image, queue_size=1)
# create commond of 'mbot_teleop_object'
self.mbot_teleop_object = rospy.Publisher('/cmd_vel', Twist, queue_size=5)
# 获取haar特征的级联表的XML文件,文件路径在launch文件中传入
cascade_1 = rospy.get_param("~cascade_1", "")
cascade_2 = rospy.get_param("~cascade_2", "")
# 使用级联表初始化haar特征检测器
self.cascade_1 = cv2.CascadeClassifier(cascade_1)
self.cascade_2 = cv2.CascadeClassifier(cascade_2)
# 设置级联表的参数,优化人脸识别,可以在launch文件中重新配置
self.haar_scaleFactor = rospy.get_param("~haar_scaleFactor", 1.2)
self.haar_minNeighbors = rospy.get_param("~haar_minNeighbors", 2)
self.haar_minSize = rospy.get_param("~haar_minSize", 40)
self.haar_maxSize = rospy.get_param("~haar_maxSize", 60)
self.color = (50, 255, 50)
# 初始化订阅rgb格式图像数据的订阅者,此处图像topic的话题名可以在launch文件中重映射
self.image_sub = rospy.Subscriber("input_rgb_image", Image, self.image_callback, queue_size=1)
def image_callback(self, data):
# 使用cv_bridge将ROS的图像数据转换成OpenCV的图像格式
try:
cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
frame = np.array(cv_image, dtype=np.uint8)
except CvBridgeError, e:
print e
# 创建灰度图像
grey_image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 创建平衡直方图,减少光线影响
grey_image = cv2.equalizeHist(grey_image)
# 尝试检测人脸
faces_result = self.detect_face(grey_image)
# 创建并发布twist消息
twist = Twist()
image_w = grey_image.shape[0]
image_h = grey_image.shape[1]
# 在opencv的窗口中框出所有人脸区域
if len(faces_result)>0:
for face in faces_result:
x, y, w, h = face
cv2.rectangle(cv_image, (x, y), (x+w, y+h), self.color, 2)
rospy.loginfo("my face detector (x=%d, y=%d, w=%d, h=%d)", x, y, w, h)
info = ""
if( w > image_w/2 and h > image_h/2): #front
twist.linear.x = 0.5
info += "front"
else:
twist.linear.x = -0.5 #back
info += "back"
twist.linear.y = 0;
twist.linear.z = 0;
twist.angular.x = 0;
twist.angular.y = 0;
if (x > image_w/2):
twist.angular.z = 0.5 #turn right
info += "-right"
else:
twist.angular.z = -0.5 #turn left
info += "-left"
rospy.loginfo("my twist(l_x=%f, a_z=%f)", twist.linear.x, twist.angular.z)
rospy.loginfo(info)
self.mbot_teleop_object.publish(twist)
# 将识别后的图像转换成ROS消息并发布
self.image_pub.publish(self.bridge.cv2_to_imgmsg(cv_image, "bgr8"))
def detect_face(self, input_image):
# 首先匹配正面人脸的模型
if self.cascade_1:
faces = self.cascade_1.detectMultiScale(input_image,
self.haar_scaleFactor,
self.haar_minNeighbors,
cv2.CASCADE_SCALE_IMAGE,
(self.haar_minSize, self.haar_maxSize))
# 如果正面人脸匹配失败,那么就尝试匹配侧面人脸的模型
if len(faces) == 0 and self.cascade_2:
faces = self.cascade_2.detectMultiScale(input_image,
self.haar_scaleFactor,
self.haar_minNeighbors,
cv2.CASCADE_SCALE_IMAGE,
(self.haar_minSize, self.haar_maxSize))
return faces
def cleanup(self):
print "Shutting down vision node."
cv2.destroyAllWindows()
if __name__ == '__main__':
try:
# 初始化ros节点
rospy.init_node("face_detector")
faceDetector()
rospy.loginfo("Face detector is started..")
rospy.loginfo("Please subscribe the ROS image.")
rospy.spin()
except KeyboardInterrupt:
print "Shutting down face detector node."
cv2.destroyAllWindows()
效果如下:
三、物体识别
复现tensorflow物体识别的案例,并实现以下的物体跟随场景:
- 识别一个杯子及其在图像中的位置。
- 根据杯子的识别结果,发布速度控制命令,控制仿真机器人的运动。例如:杯子远离摄像头,小车前进,杯子在图像中向左运动,小车左转。
创建tensorflow_object_detector
功能包,运行命令如下:
roslaunch mbot_gazebo view_mbot_gazebo_empty_world_automobile.launch #打开小车模型
roslaunch tensorflow_object_detector usb_cam_detector.launch #基于水杯检测控制小车
其中usb_cam_detector.launch
为相机启动文件,内容如下:
<launch>
<node pkg= "tensorflow_object_detector" name="detect_ros" type="detect_ros.py" output="screen">
<remap from="image" to="/usb_cam_node/image_raw"/>
</node>
<node pkg="usb_cam" type="usb_cam_node" name="usb_cam_node" output="screen">
<param name="pixel_format" value="yuyv"/>
<param name="video_device" value="/dev/video0"/>
</node>
<node pkg="image_view" type="image_view" name="image_view">
<remap from="image" to="debug_image"/>
</node>
</launch>
核心算法脚本文件detect_ros.py
,其内容如下:
#!/usr/bin/env python
## Author: Rohit
## Date: July, 25, 2017
# Purpose: Ros node to detect objects using tensorflow
import os
import sys
import cv2
import numpy as np
try:
import tensorflow as tf
except ImportError:
print("unable to import TensorFlow. Is it installed?")
print(" sudo apt install python-pip")
print(" sudo pip install tensorflow")
sys.exit(1)
# ROS related imports
import rospy
from std_msgs.msg import String , Header
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
from vision_msgs.msg import Detection2D, Detection2DArray, ObjectHypothesisWithPose
# Object detection module imports
import object_detection
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
from geometry_msgs.msg import Twist
# SET FRACTION OF GPU YOU WANT TO USE HERE
GPU_FRACTION = 0.4
######### Set model here ############
MODEL_NAME = 'ssd_mobilenet_v1_coco_11_06_2017'
# By default models are stored in data/models/
MODEL_PATH = os.path.join(os.path.dirname(sys.path[0]),'data','models' , MODEL_NAME)
# Path to frozen detection graph. This is the actual model that is used for the object detection.
PATH_TO_CKPT = MODEL_PATH + '/frozen_inference_graph.pb'
######### Set the label map file here ###########
LABEL_NAME = 'mscoco_label_map.pbtxt'
# By default label maps are stored in data/labels/
PATH_TO_LABELS = os.path.join(os.path.dirname(sys.path[0]),'data','labels', LABEL_NAME)
######### Set the number of classes here #########
NUM_CLASSES = 90
detection_graph = tf.Graph()
with detection_graph.as_default():
od_graph_def = tf.GraphDef()
with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
serialized_graph = fid.read()
od_graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(od_graph_def, name='')
## Loading label map
# Label maps map indices to category names, so that when our convolution network predicts `5`,
# we know that this corresponds to `airplane`. Here we use internal utility functions,
# but anything that returns a dictionary mapping integers to appropriate string labels would be fine
label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)
# Setting the GPU options to use fraction of gpu that has been set
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = GPU_FRACTION
# Detection
class Detector:
def __init__(self):
self.image_pub = rospy.Publisher("debug_image",Image, queue_size=1)
self.object_pub = rospy.Publisher("objects", Detection2DArray, queue_size=1)
self.bridge = CvBridge()
self.image_sub = rospy.Subscriber("image", Image, self.image_cb, queue_size=1, buff_size=2**24)
self.sess = tf.Session(graph=detection_graph,config=config)
# create commond of 'mbot_teleop_object'
self.mbot_teleop_object = rospy.Publisher('/cmd_vel', Twist, queue_size=5)
def image_cb(self, data):
objArray = Detection2DArray()
try:
cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
except CvBridgeError as e:
print(e)
image=cv2.cvtColor(cv_image,cv2.COLOR_BGR2RGB)
# the array based representation of the image will be used later in order to prepare the
# result image with boxes and labels on it.
image_np = np.asarray(image)
# Expand dimensions since the model expects images to have shape: [1, None, None, 3]
image_np_expanded = np.expand_dims(image_np, axis=0)
image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
# Each box represents a part of the image where a particular object was detected.
boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
# Each score represent how level of confidence for each of the objects.
# Score is shown on the result image, together with the class label.
scores = detection_graph.get_tensor_by_name('detection_scores:0')
classes = detection_graph.get_tensor_by_name('detection_classes:0')
num_detections = detection_graph.get_tensor_by_name('num_detections:0')
(boxes, scores, classes, num_detections) = self.sess.run([boxes, scores, classes, num_detections],
feed_dict={image_tensor: image_np_expanded})
objects=vis_util.visualize_boxes_and_labels_on_image_array(
image,
np.squeeze(boxes),
np.squeeze(classes).astype(np.int32),
np.squeeze(scores),
category_index,
use_normalized_coordinates=True,
line_thickness=2)
objArray.detections =[]
objArray.header=data.header
object_count=1
for i in range(len(objects)):
object_count+=1
objArray.detections.append(self.object_predict(objects[i],data.header,image_np,cv_image))
self.object_pub.publish(objArray)
img=cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
image_out = Image()
try:
image_out = self.bridge.cv2_to_imgmsg(img,"bgr8")
except CvBridgeError as e:
print(e)
image_out.header = data.header
self.image_pub.publish(image_out)
def object_predict(self,object_data, header, image_np,image):
image_height,image_width,channels = image.shape
obj=Detection2D()
obj_hypothesis= ObjectHypothesisWithPose()
object_id=object_data[0]
object_score=object_data[1]
dimensions=object_data[2]
obj.header=header
obj_hypothesis.id = str(object_id)
obj_hypothesis.score = object_score
obj.results.append(obj_hypothesis)
obj.bbox.size_y = int((dimensions[2]-dimensions[0])*image_height)
obj.bbox.size_x = int((dimensions[3]-dimensions[1] )*image_width)
obj.bbox.center.x = int((dimensions[1] + dimensions [3])*image_height/2)
obj.bbox.center.y = int((dimensions[0] + dimensions[2])*image_width/2)
if object_id == 47:
image_w = image_width
image_h = image_height
twist = Twist()
w = obj.bbox.size_x
h = obj.bbox.size_y
x = obj.bbox.center.x
y = obj.bbox.center.y
info = ""
if( w > image_w/2 and h > image_h/2): #front
twist.linear.x = 0.5
info += "front"
else:
twist.linear.x = -0.5 #back
info += "back"
twist.linear.y = 0;
twist.linear.z = 0;
twist.angular.x = 0;
twist.angular.y = 0;
if (x > image_w/2):
twist.angular.z = 0.5 #turn right
info += "-right"
else:
twist.angular.z = -0.5 #turn left
info += "-left"
rospy.loginfo("id=%d x=%d y=%d w=%d h=%d", object_id, obj.bbox.center.x, obj.bbox.center.y, obj.bbox.size_x, obj.bbox.size_y)
self.mbot_teleop_object.publish(twist)
return obj
def main(args):
rospy.init_node('detector_node')
obj=Detector()
try:
rospy.spin()
except KeyboardInterrupt:
print("ShutDown")
cv2.destroyAllWindows()
if __name__=='__main__':
main(sys.argv)
运行效果如下: