ROS 2 系列 4 - Action 编程

Action 是 ROS 2 中专门用于长时间运行任务的通信机制。它由三部分组成:

组成部分方向说明
Goal(目标)客户端 → 服务器任务指令,告诉服务器要做什么
Feedback(反馈)服务器 → 客户端任务执行过程中周期性地返回进度信息
Result(结果)服务器 → 客户端任务完成后返回的最终结果

Action 的功能类似于 Service,但有两个关键区别:

  1. 可取消:客户端可以在任务执行过程中发送取消请求
  1. 带反馈:服务器可以持续向客户端报告执行进度

Action 采用客户端-服务器模型,客户端发送目标(Goal),服务器执行任务且返回反馈(Feedback)和结果(Result)。


1. 创建自定义 Action 接口

与 Topic 和 Service 类似,使用 Action 之前需要先定义接口。Action 接口定义在 .action 文件中。

1.1. 创建接口包

cd ~/ros2_ws/src
ros2 pkg create --license Apache-2.0 custom_action_interfaces
cd custom_action_interfaces
mkdir action

1.2. 定义 Fibonacci.action

在 action 目录下创建 Fibonacci.action 文件,内容如下:

int32 order
---
int32[] sequence
---
int32[] partial_sequence
  • --- 上方:Goal,表示要计算的斐波那契数列的阶数
  • 第一个 --- 下方:Result,表示计算完成的完整数列
  • 第二个 --- 下方:Feedback,表示当前已计算的部分数列

1.3. 修改 CMakeLists.txt

在 CMakeLists.txt 中添加以下内容:

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/Fibonacci.action"
)

1.4. 修改 package.xml

在 package.xml 中添加依赖:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

1.5. 编译接口包

cd ~/ros2_ws
colcon build --packages-select custom_action_interfaces
source install/setup.bash

2. 编写 Action 服务器

下面编写计算斐波那契数列的 Action 服务器。

在工作空间 src 目录下创建功能包 action_demo_py

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 action_demo_py

在 action_demo_py/action_demo_py 目录下创建 fibonacci_action_server.py

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from custom_action_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        # 创建 Action 服务器
        # 参数:节点本身,Action 类型,Action 名称,执行回调函数
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback
        )
        self.get_logger().info('Fibonacci Action Server 已启动')

    def execute_callback(self, goal_handle):
        """接收到 Goal 时执行的回调函数"""
        self.get_logger().info('开始执行目标...')

        # 1. 从 goal 中获取请求参数
        order = goal_handle.request.order

        # 2. 初始化反馈和结果对象
        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        result_msg = Fibonacci.Result()

        # 3. 执行计算,周期性发送反馈
        for i in range(1, order):
            # 检查客户端是否请求取消
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                self.get_logger().info('目标已被取消')
                return result_msg

            # 计算下一个斐波那契数
            next_fib = feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
            feedback_msg.partial_sequence.append(next_fib)

            # 发布反馈
            self.get_logger().info(f'反馈: {feedback_msg.partial_sequence}')
            goal_handle.publish_feedback(feedback_msg)

        # 4. 任务完成,设置最终结果
        result_msg.sequence = feedback_msg.partial_sequence
        goal_handle.succeed()

        self.get_logger().info(f'结果: {result_msg.sequence}')
        return result_msg


def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionServer()
    rclpy.spin(node)


if __name__ == '__main__':
    main()

代码解析

  • ActionServer 需要四个参数:节点本身、Action 类型、Action 名称、执行回调函数
  • execute_callback 是核心方法,在接收到 Goal 时被调用
  • goal_handle.is_cancel_requested 用于检查客户端是否请求取消
  • goal_handle.publish_feedback() 用于向客户端发送进度反馈
  • goal_handle.succeed() 标记任务成功完成

3. 编写 Action 客户端

在 action_demo_py/action_demo_py 目录下创建 fibonacci_action_client.py

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from custom_action_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        # 创建 Action 客户端
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        # 1. 创建 Goal 消息
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        # 2. 等待服务器可用
        self._action_client.wait_for_server()

        # 3. 异步发送 Goal,并注册反馈回调
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )

        # 4. 为异步操作添加完成回调
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        """处理服务器的响应(接受或拒绝 Goal)"""
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('目标被拒绝 :(')
            return

        self.get_logger().info('目标被接受 :)')

        # 获取最终结果的 Future
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        """处理最终结果"""
        result = future.result().result
        self.get_logger().info(f'最终结果: {result.sequence}')
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        """处理周期性反馈"""
        feedback = feedback_msg.feedback
        self.get_logger().info(f'收到反馈: {feedback.partial_sequence}')


def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionClient()
    node.send_goal(10)  # 计算 10 阶斐波那契数列
    rclpy.spin(node)


if __name__ == '__main__':
    main()

代码解析

  • ActionClient 需要三个参数:节点本身、Action 类型、Action 名称
  • send_goal_async() 异步发送 Goal,可传入 feedback_callback 接收反馈
  • goal_response_callback 在服务器响应时触发(接受或拒绝)
  • get_result_async() 在 Goal 被接受后,异步等待最终结果
  • feedback_callback 每次收到反馈时触发

4. 配置 setup.py

修改 action_demo_py/setup.py,添加入口点:

entry_points={
    'console_scripts': [
        'fibonacci_action_server = action_demo_py.fibonacci_action_server:main',
        'fibonacci_action_client = action_demo_py.fibonacci_action_client:main',
    ],
},

5. 编译与运行

5.1. 编译

cd ~/ros2_ws
colcon build --packages-select action_demo_py
source install/setup.bash

5.2. 运行服务器和客户端

在终端 1 中:

source install/setup.bash
ros2 run action_demo_py fibonacci_action_server

在终端 2 中:

source install/setup.bash
ros2 run action_demo_py fibonacci_action_client

6. 命令行工具

ROS 2 提供丰富的命令行工具调试 Action。

6.1. 查看所有 Action

ros2 action list

6.2. 查看 Action 详细信息

ros2 action info /fibonacci

6.3. 查看 Action 类型

ros2 action list -t

6.4. 手动发送 Action 目标

ros2 action send_goal /fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"

加上 --feedback 可以同时查看反馈:

ros2 action send_goal /fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}" --feedback