使用python结合redis 开发ros2节点
目的
在ros2中,使用redis作为实时数据共享,减少各个节点中的数据交互,从而减少系统开销。
功能
1、每一秒发布一次topic
2、每一秒写一次数据库,并调用
环境准备
1、已经安装有ros2的ubuntu虚拟机
2、安装python开发环境
3、redis已经安装完成,如果没有完成请参照:内存数据库redis,pyhon操作-环境准备和简单示例
程序示例
import rclpy
import os
from rclpy.node import Node
from std_msgs.msg import String
import pathlib
from redis import ConnectionPool
import redis
class robot_start(Node):
def __init__(self):
super().__init__('talker',namespace='test')
self.i = 0
self.pub = self.create_publisher(String, 'chatter')
self.timer = self.create_timer(1.0, self.timer_callback)
self.redis_conn = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.redis_conn.set('name', 'aaa')
self.count =1
def timer_callback(self):
msg = String()
msg.data = 'Hello World: {0}'.format(self.i)
self.i += 1
self.get_logger().info('Publishing: "{0}"'.format(msg.data))
self.pub.publish(msg)
self.count += 1
self.redis_conn.set('name', self.count)
self.get_logger().info(self.redis_conn['name'])
self.get_logger().info(self.redis_conn.get('name'))
def main(args=None):
rclpy.init(args=args)
robotstart = robot_start()
rclpy.spin(robotstart)
robotstart.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
# Runs a talker node when this script is run directly (not through an entrypoint)
main()
结果
[INFO] [test.talker]: Publishing: "Hello World: 0"
[INFO] [test.talker]: 2
[INFO] [test.talker]: 2
[INFO] [test.talker]: Publishing: "Hello World: 1"
[INFO] [test.talker]: 3
[INFO] [test.talker]: 3
[INFO] [test.talker]: Publishing: "Hello World: 2"
[INFO] [test.talker]: 4
[INFO] [test.talker]: 4
[INFO] [test.talker]: Publishing: "Hello World: 3"
[INFO] [test.talker]: 5
[INFO] [test.talker]: 5
[INFO] [test.talker]: Publishing: "Hello World: 4"
[INFO] [test.talker]: 6
[INFO] [test.talker]: 6
[INFO] [test.talker]: Publishing: "Hello World: 5"
[INFO] [test.talker]: 7
[INFO] [test.talker]: 7