ROS 2 系列 2 - Topic 编程
Topic(话题)是 ROS 2 中最常用的异步通信方式。它像“电台广播”:发布者(Publisher)只管把数据扔到话题上,订阅者(Subscriber) 只要感兴趣,并且注册回调函数,就能收到数据。发布者和订阅者互不知晓对方存在,完全解耦。
本节将讲述:
- 创建存放自定义消息的接口包。
- 定义
Velocity.msg。
- 创建 Python 功能包。
- 编写发布者节点(每秒发送速度指令)。
- 编写订阅者节点(接收及打印速度指令)。
- 编译运行及验证。
1. 创建自定义消息接口包(custom_interfaces)
在 ROS 2 中,msg 文件必须放在独立的接口包里。
打开终端,进入工作空间 src 目录:
cd ~/ros2_ws/src创建 C++ 类型(ament_cmake)的接口包(因为生成 msg 需要编译时构建工具,用 ament_cmake 最稳妥):
ros2 pkg create --build-type ament_cmake custom_interfaces进入包目录,创建 msg 目录,定义消息:
cd custom_interfaces
mkdir msg在 msg 目录中新建 Velocity.msg 文件,写入以下内容:
float64 linear_x # 前进速度 (m/s)
float64 angular_z # 旋转角速度 (rad/s)修改 CMakeLists.txt:找到 find_package 部分,添加 rosidl_default_generators,在文件末尾添加:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Velocity.msg"
)修改 package.xml:在 <buildtool_depend> 后添加以下依赖:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>编译这个接口包(回到工作空间根目录):
cd ~/ros2_ws
colcon build --packages-select custom_interfaces
# 或 colcon build --cmake-args -DPython3_EXECUTABLE=/usr/bin/python3Source 环境,找到该新接口:
source install/setup.bash2. 创建 Python 功能包(topic_demo_py)
创建存放节点代码的 Python 包。
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python topic_demo_py --dependencies rclpy custom_interfaces--dependencies 自动将 rclpy 和 custom_interfaces 写入 package.xml。
3. 编写发布者节点(velocity_publisher.py)
在 topic_demo_py/topic_demo_py/ 目录下新建文件 velocity_publisher.py,写入以下代码:
import rclpy
from rclpy.node import Node
from custom_interfaces.msg import Velocity # 导入自定义消息
class VelocityPublisher(Node):
def __init__(self):
super().__init__('velocity_publisher')
# 创建发布者:消息类型 Velocity,话题名 "/robot_velocity",队列深度 10
self.publisher_ = self.create_publisher(Velocity, '/robot_velocity', 10)
# 创建一个定时器,每 0.5 秒触发一次回调
self.timer = self.create_timer(0.5, self.timer_callback)
self.counter = 0
def timer_callback(self):
msg = Velocity()
# 让速度值做点小变化,模拟机器人加减速
msg.linear_x = 1.0 + 0.5 * (self.counter % 4)
msg.angular_z = 0.5 * (self.counter % 3)
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: linear={msg.linear_x:.2f}, angular={msg.angular_z:.2f}')
self.counter += 1
def main(args=None):
rclpy.init(args=args)
node = VelocityPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()4. 编写订阅者节点(velocity_subscriber.py)
在同一目录下新建 velocity_subscriber.py,写入:
import rclpy
from rclpy.node import Node
from custom_interfaces.msg import Velocity # 导入相同的消息类型
class VelocitySubscriber(Node):
def __init__(self):
super().__init__('velocity_subscriber')
# 创建订阅者:订阅 "/robot_velocity" 话题,回调函数为 listener_callback
self.subscription = self.create_subscription(
Velocity,
'/robot_velocity',
self.listener_callback,
10 # 队列深度
)
def listener_callback(self, msg):
# 每当收到一条消息,ROS 2 就会自动调用这个函数
self.get_logger().info(
f'Received: linear_x={msg.linear_x:.2f} m/s, angular_z={msg.angular_z:.2f} rad/s'
)
def main(args=None):
rclpy.init(args=args)
node = VelocitySubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()5. 配置 setup.py 入口点(Entry Points)
为让 ros2 run 能找到这两个节点,需要修改 topic_demo_py/setup.py。
找到 entry_points 字段,修改为:
entry_points={
'console_scripts': [
'publisher = topic_demo_py.velocity_publisher:main',
'subscriber = topic_demo_py.velocity_subscriber:main',
],
},6. 编译及运行
回到工作空间根目录编译整个工作空间(确保接口包和功能包都已编译):
cd ~/ros2_ws
colcon build
source install/setup.bash # 务必 source 环境打开两个终端(都需 source 环境):
6.1. 终端 1 - 运行发布者
ros2 run topic_demo_py publisher每隔 0.5 秒打印一次发布的速度值。
6.2. 终端 2:运行订阅者
ros2 run topic_demo_py subscriber打印收到的速度值,一一对应。
7. 用命令行工具验证
7.1. 查看当前所有活跃话题
ros2 topic list7.2. 直接打印话题上的数据
ros2 topic echo /robot_velocity该命令是排查 Topic 是否有数据发出的第一把交椅。
7.3. 手动发送数据
ros2 topic pub /robot_velocity custom_interfaces/msg/Velocity "{linear_x: 2.5, angular_z: 1.2}"