ROS 2 系列 2 - Topic 编程

Topic(话题)是 ROS 2 中最常用的异步通信方式。它像“电台广播”:发布者(Publisher)只管把数据扔到话题上,订阅者(Subscriber) 只要感兴趣,并且注册回调函数,就能收到数据。发布者和订阅者互不知晓对方存在,完全解耦。

本节将讲述:

  1. 创建存放自定义消息的接口包。
  1. 定义 Velocity.msg
  1. 创建 Python 功能包。
  1. 编写发布者节点(每秒发送速度指令)。
  1. 编写订阅者节点(接收及打印速度指令)。
  1. 编译运行及验证。

1. 创建自定义消息接口包(custom_interfaces

在 ROS 2 中,msg 文件必须放在独立的接口包里。

打开终端,进入工作空间 src 目录:

cd ~/ros2_ws/src

创建 C++ 类型(ament_cmake)的接口包(因为生成 msg 需要编译时构建工具,用 ament_cmake 最稳妥):

ros2 pkg create --build-type ament_cmake custom_interfaces

进入包目录,创建 msg 目录,定义消息:

cd custom_interfaces
mkdir msg

在 msg 目录中新建 Velocity.msg 文件,写入以下内容:

float64 linear_x   # 前进速度 (m/s)
float64 angular_z  # 旋转角速度 (rad/s)

修改 CMakeLists.txt:找到 find_package 部分,添加 rosidl_default_generators,在文件末尾添加:

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Velocity.msg"
)

修改 package.xml:在 <buildtool_depend> 后添加以下依赖:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

编译这个接口包(回到工作空间根目录):

cd ~/ros2_ws
colcon build --packages-select custom_interfaces
# 或 colcon build --cmake-args -DPython3_EXECUTABLE=/usr/bin/python3

Source 环境,找到该新接口:

source install/setup.bash

2. 创建 Python 功能包(topic_demo_py

创建存放节点代码的 Python 包。

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python topic_demo_py --dependencies rclpy custom_interfaces

--dependencies 自动将 rclpy 和 custom_interfaces 写入 package.xml


3. 编写发布者节点(velocity_publisher.py

在 topic_demo_py/topic_demo_py/ 目录下新建文件 velocity_publisher.py,写入以下代码:

import rclpy
from rclpy.node import Node
from custom_interfaces.msg import Velocity  # 导入自定义消息

class VelocityPublisher(Node):
    def __init__(self):
        super().__init__('velocity_publisher')
        # 创建发布者:消息类型 Velocity,话题名 "/robot_velocity",队列深度 10
        self.publisher_ = self.create_publisher(Velocity, '/robot_velocity', 10)
        # 创建一个定时器,每 0.5 秒触发一次回调
        self.timer = self.create_timer(0.5, self.timer_callback)
        self.counter = 0

    def timer_callback(self):
        msg = Velocity()
        # 让速度值做点小变化,模拟机器人加减速
        msg.linear_x = 1.0 + 0.5 * (self.counter % 4)  
        msg.angular_z = 0.5 * (self.counter % 3)
        
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: linear={msg.linear_x:.2f}, angular={msg.angular_z:.2f}')
        self.counter += 1

def main(args=None):
    rclpy.init(args=args)
    node = VelocityPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

4. 编写订阅者节点(velocity_subscriber.py

在同一目录下新建 velocity_subscriber.py,写入:

import rclpy
from rclpy.node import Node
from custom_interfaces.msg import Velocity  # 导入相同的消息类型

class VelocitySubscriber(Node):
    def __init__(self):
        super().__init__('velocity_subscriber')
        # 创建订阅者:订阅 "/robot_velocity" 话题,回调函数为 listener_callback
        self.subscription = self.create_subscription(
            Velocity,
            '/robot_velocity',
            self.listener_callback,
            10  # 队列深度
        )

    def listener_callback(self, msg):
        # 每当收到一条消息,ROS 2 就会自动调用这个函数
        self.get_logger().info(
            f'Received: linear_x={msg.linear_x:.2f} m/s, angular_z={msg.angular_z:.2f} rad/s'
        )

def main(args=None):
    rclpy.init(args=args)
    node = VelocitySubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

5. 配置 setup.py 入口点(Entry Points)

为让 ros2 run 能找到这两个节点,需要修改 topic_demo_py/setup.py

找到 entry_points 字段,修改为:

entry_points={
    'console_scripts': [
        'publisher = topic_demo_py.velocity_publisher:main',
        'subscriber = topic_demo_py.velocity_subscriber:main',
    ],
},

6. 编译及运行

回到工作空间根目录编译整个工作空间(确保接口包和功能包都已编译):

cd ~/ros2_ws
colcon build
source install/setup.bash   # 务必 source 环境

打开两个终端(都需 source 环境):

6.1. 终端 1 - 运行发布者

ros2 run topic_demo_py publisher

每隔 0.5 秒打印一次发布的速度值。

6.2. 终端 2:运行订阅者

ros2 run topic_demo_py subscriber

打印收到的速度值,一一对应。


7. 用命令行工具验证

7.1. 查看当前所有活跃话题

ros2 topic list

7.2. 直接打印话题上的数据

ros2 topic echo /robot_velocity

该命令是排查 Topic 是否有数据发出的第一把交椅

7.3. 手动发送数据

ros2 topic pub /robot_velocity custom_interfaces/msg/Velocity "{linear_x: 2.5, angular_z: 1.2}"