该代码用于分析和处理点云数据,通过对点云数据进行裁剪、平面拟合和生成包围框来提取特定区域的特征并发布结果。主要使用了RANSAC算法来识别并拟合平面,从而提取平面的法向量,接着根据该平面计算出该区域的最小矩形包围框(Bounding Box),并得到包围框的中心点以及相对于Y轴的倾角。最终,程序将处理后的点云和包围框通过ROS节点发布出来,用于后续的处理或可视化。
1.导入必要的库
import rospy
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs import point_cloud2
from std_msgs.msg import Header
from sklearn.linear_model import RANSACRegressor
from visualization_msgs.msg import Marker
from geometry_msgs.msg import Point
import math
2. 设置XYZ范围限制和RANSAC误差阈值
X_MIN, X_MAX = 0.5, 3
Y_MIN, Y_MAX = 0, 2
Z_MIN, Z_MAX = -0.5, 0.2
DISTANCE_THRESHOLD = 0.05 # RANSAC 拟合的误差阈值
3. 裁剪点云,限制XYZ坐标范围
def filter_xyz_range(points):
"""裁剪点云,将点限制在指定 XYZ 范围内"""
return points[
(points[:, 0] >= X_MIN) & (points[:, 0] <= X_MAX) &
(points[:, 1] >= Y_MIN) & (points[:, 1] <= Y_MAX) &
(points[:, 2] >= Z_MIN) & (points[:, 2] <= Z_MAX)
]
4. 使用RANSAC算法拟合平面
def fit_plane_ransac(points):
"""使用 RANSAC 算法拟合平面,返回平面的法向量和拟合的平面点"""
if len(points) < 3:
return None, None
X = points[:, :2] # 使用 x, y 特征
y = points[:, 2] # Z 坐标作为目标值
ransac = RANSACRegressor(residual_threshold=DISTANCE_THRESHOLD)
ransac.fit(X, y)
inlier_mask = ransac.inlier_mask_
plane_points = points[inlier_mask] # 选取平面内的点
normal_vector = np.array([ransac.estimator_.coef_[0], ransac.estimator_.coef_[1], -1]) # 计算法向量
return normal_vector, plane_points
5. 算包围框的8个顶点坐标及中心点坐标
def compute_bounding_box(points):
"""根据点云计算最小矩形包围框的8个顶点坐标"""
min_vals = np.min(points, axis=0)
max_vals = np.max(points, axis=0)
# 计算矩形框8个顶点坐标
p1 = [min_vals[0], min_vals[1], min_vals[2]]
p2 = [min_vals[0], min_vals[1], max_vals[2]]
p3 = [min_vals[0], max_vals[1], min_vals[2]]
p4 = [min_vals[0], max_vals[1], max_vals[2]]
p5 = [max_vals[0], min_vals[1], min_vals[2]]
p6 = [max_vals[0], min_vals[1], max_vals[2]]
p7 = [max_vals[0], max_vals[1], min_vals[2]]
p8 = [max_vals[0], max_vals[1], max_vals[2]]
return np.array([p1, p2, p3, p4, p5, p6, p7, p8])
def compute_bounding_box_center(bbox_points):
"""计算包围框的中心坐标"""
center = np.mean(bbox_points, axis=0)
return center
6. 计算平面法向量与Y轴的倾斜角
def compute_pitch_angle(normal_vector):
"""计算法向量和Y轴的倾斜角(以度为单位)"""
y_axis = np.array([0, 1, 0])
cos_angle = np.dot(normal_vector, y_axis) / (np.linalg.norm(normal_vector) * np.linalg.norm(y_axis))
cos_angle = np.clip(cos_angle, -1.0, 1.0)
angle = math.acos(cos_angle) # 计算夹角(弧度)
return math.degrees(angle) # 转换为角度
7. 创建包围框的Marker消息(可视化)
def create_bounding_box_marker(bbox_points, frame_id):
"""创建一个长方体包围框的 Marker 消息,用于可视化"""
marker = Marker()
marker.header.frame_id = frame_id
marker.header.stamp = rospy.Time.now()
marker.type = Marker.LINE_LIST # 使用线框模型
marker.action = Marker.ADD
marker.scale.x = 0.01 # 线的粗细
marker.color.r = 1.0
marker.color.g = 0.0
marker.color.b = 0.0
marker.color.a = 1.0
# 定义包围框的边界线
edges = [
[0, 1], [1, 3], [3, 2], [2, 0], # 底面四条边
[4, 5], [5, 7], [7, 6], [6, 4], # 顶面四条边
[0, 4], [1, 5], [2, 6], [3, 7] # 连接底面和顶面的四条边
]
# 构建包围框的线条
for edge in edges:
p1 = Point(*bbox_points[edge[0]])
p2 = Point(*bbox_points[edge[1]])
marker.points.append(p1)
marker.points.append(p2)
return marker
8.发布点云话题:
def publish_point_cloud(points, frame_id, topic):
"""发布点云消息"""
if len(points) == 0:
rospy.logwarn("点云为空,未发布消息。")
return
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = frame_id
cloud_msg = point_cloud2.create_cloud_xyz32(header, points)
rospy.Publisher(topic, PointCloud2, queue_size=1).publish(cloud_msg)
9.回调函数及主函数
def point_cloud_callback(msg):
points = np.array([p[:3] for p in point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)])
points_in_range = filter_xyz_range(points)
if points_in_range.size == 0:
rospy.logwarn("裁剪后没有点云数据。")
return
normal1, plane_points1 = fit_plane_ransac(points_in_range)
normal2, plane_points2 = fit_plane_ransac(points_in_range)
if normal1 is None or normal2 is None:
rospy.logwarn("平面拟合失败,无法生成包围框。")
return
bounding_box_points = compute_bounding_box(points_in_range)
bbox_center = compute_bounding_box_center(bounding_box_points)
angle = compute_pitch_angle(normal1)
# 输出三维矩形框的中心坐标
rospy.loginfo(f"Bounding Box Center: {bbox_center}")
rospy.loginfo(f"Angle with Y axis: {angle:.2f} degrees")
bbox_marker = create_bounding_box_marker(bounding_box_points, msg.header.frame_id)
publish_point_cloud(points_in_range, msg.header.frame_id, "/filtered_points")
bbox_pub.publish(bbox_marker)
def main():
rospy.init_node("bounding_box_fitter")
# 初始化发布器
global bbox_pub
bbox_pub = rospy.Publisher("/bounding_box", Marker, queue_size=1)
# 订阅点云话题
rospy.Subscriber("/merged_point_cloud", PointCloud2, point_cloud_callback)
rospy.spin()
10.完整的代码如下:
import rospy
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs import point_cloud2
from std_msgs.msg import Header
from sklearn.linear_model import RANSACRegressor
from visualization_msgs.msg import Marker
from geometry_msgs.msg import Point
import math
# 设置 XYZ 范围限制
X_MIN, X_MAX = 0.5, 3
Y_MIN, Y_MAX = 0, 2
Z_MIN, Z_MAX = -0.5, 0.2
DISTANCE_THRESHOLD = 0.05 # RANSAC 拟合的误差阈值
def filter_xyz_range(points):
"""裁剪点云,将点限制在指定 XYZ 范围内"""
return points[
(points[:, 0] >= X_MIN) & (points[:, 0] <= X_MAX) &
(points[:, 1] >= Y_MIN) & (points[:, 1] <= Y_MAX) &
(points[:, 2] >= Z_MIN) & (points[:, 2] <= Z_MAX)
]
def fit_plane_ransac(points):
"""使用 RANSAC 算法拟合平面,返回平面的法向量和拟合的平面点"""
if len(points) < 3:
return None, None
X = points[:, :2] # 使用 x, y 特征
y = points[:, 2] # Z 坐标作为目标值
ransac = RANSACRegressor(residual_threshold=DISTANCE_THRESHOLD)
ransac.fit(X, y)
inlier_mask = ransac.inlier_mask_
plane_points = points[inlier_mask] # 选取平面内的点
normal_vector = np.array([ransac.estimator_.coef_[0], ransac.estimator_.coef_[1], -1]) # 计算法向量
return normal_vector, plane_points
def compute_bounding_box(points):
"""根据点云计算最小矩形包围框的8个顶点坐标"""
min_vals = np.min(points, axis=0)
max_vals = np.max(points, axis=0)
# 计算矩形框8个顶点坐标
p1 = [min_vals[0], min_vals[1], min_vals[2]]
p2 = [min_vals[0], min_vals[1], max_vals[2]]
p3 = [min_vals[0], max_vals[1], min_vals[2]]
p4 = [min_vals[0], max_vals[1], max_vals[2]]
p5 = [max_vals[0], min_vals[1], min_vals[2]]
p6 = [max_vals[0], min_vals[1], max_vals[2]]
p7 = [max_vals[0], max_vals[1], min_vals[2]]
p8 = [max_vals[0], max_vals[1], max_vals[2]]
return np.array([p1, p2, p3, p4, p5, p6, p7, p8])
def compute_bounding_box_center(bbox_points):
"""计算包围框的中心坐标"""
center = np.mean(bbox_points, axis=0)
return center
def compute_pitch_angle(normal_vector):
"""计算法向量和Y轴的倾斜角(以度为单位)"""
y_axis = np.array([0, 1, 0])
cos_angle = np.dot(normal_vector, y_axis) / (np.linalg.norm(normal_vector) * np.linalg.norm(y_axis))
cos_angle = np.clip(cos_angle, -1.0, 1.0)
angle = math.acos(cos_angle) # 计算夹角(弧度)
return math.degrees(angle) # 转换为角度
def create_bounding_box_marker(bbox_points, frame_id):
"""创建一个长方体包围框的 Marker 消息,用于可视化"""
marker = Marker()
marker.header.frame_id = frame_id
marker.header.stamp = rospy.Time.now()
marker.type = Marker.LINE_LIST # 使用线框模型
marker.action = Marker.ADD
marker.scale.x = 0.01 # 线的粗细
marker.color.r = 1.0
marker.color.g = 0.0
marker.color.b = 0.0
marker.color.a = 1.0
# 定义包围框的边界线
edges = [
[0, 1], [1, 3], [3, 2], [2, 0], # 底面四条边
[4, 5], [5, 7], [7, 6], [6, 4], # 顶面四条边
[0, 4], [1, 5], [2, 6], [3, 7] # 连接底面和顶面的四条边
]
for edge in edges:
p1 = Point(*bbox_points[edge[0]])
p2 = Point(*bbox_points[edge[1]])
marker.points.append(p1)
marker.points.append(p2)
return marker
def publish_point_cloud(points, frame_id, topic):
"""发布裁剪后的点云消息"""
if len(points) == 0:
rospy.logwarn("点云为空,未发布消息。")
return
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = frame_id
cloud_msg = point_cloud2.create_cloud_xyz32(header, points)
rospy.Publisher(topic, PointCloud2, queue_size=1).publish(cloud_msg)
def point_cloud_callback(msg):
"""接收并处理点云数据,进行裁剪、平面拟合、包围框计算并发布"""
points = np.array([p[:3] for p in point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)])
points_in_range = filter_xyz_range(points)
if points_in_range.size == 0:
rospy.logwarn("裁剪后没有点云数据。")
return
normal1, plane_points1 = fit_plane_ransac(points_in_range)
normal2, plane_points2 = fit_plane_ransac(points_in_range)
if normal1 is None or normal2 is None:
rospy.logwarn("平面拟合失败,无法生成包围框。")
return
bounding_box_points = compute_bounding_box(points_in_range)
bbox_center = compute_bounding_box_center(bounding_box_points)
angle = compute_pitch_angle(normal1)
# 输出三维矩形框的中心坐标
rospy.loginfo(f"Bounding Box Center: {bbox_center}")
rospy.loginfo(f"Angle with Y axis: {angle:.2f} degrees")
bbox_marker = create_bounding_box_marker(bounding_box_points, msg.header.frame_id)
publish_point_cloud(points_in_range, msg.header.frame_id, "/filtered_points")
bbox_pub.publish(bbox_marker)
def main():
"""初始化 ROS 节点、发布器和订阅器"""
rospy.init_node("bounding_box_fitter")
# 初始化发布器
global bbox_pub
bbox_pub = rospy.Publisher("/bounding_box", Marker, queue_size=1)
# 订阅点云话题
rospy.Subscriber("/merged_point_cloud", PointCloud2, point_cloud_callback)
rospy.spin()
if __name__ == "__main__":
main()
11.运行程序以及自己的bag包,别忘了更改相应的话题 ,结果如下: