ROS 2 系列 4 - Action 编程
Action 是 ROS 2 中专门用于长时间运行任务的通信机制。它由三部分组成:
| 组成部分 | 方向 | 说明 |
|---|---|---|
| Goal(目标) | 客户端 → 服务器 | 任务指令,告诉服务器要做什么 |
| Feedback(反馈) | 服务器 → 客户端 | 任务执行过程中周期性地返回进度信息 |
| Result(结果) | 服务器 → 客户端 | 任务完成后返回的最终结果 |
Action 的功能类似于 Service,但有两个关键区别:
- 可取消:客户端可以在任务执行过程中发送取消请求
- 带反馈:服务器可以持续向客户端报告执行进度
Action 采用客户端-服务器模型,客户端发送目标(Goal),服务器执行任务且返回反馈(Feedback)和结果(Result)。
1. 创建自定义 Action 接口
与 Topic 和 Service 类似,使用 Action 之前需要先定义接口。Action 接口定义在 .action 文件中。
1.1. 创建接口包
cd ~/ros2_ws/src
ros2 pkg create --license Apache-2.0 custom_action_interfaces
cd custom_action_interfaces
mkdir action1.2. 定义 Fibonacci.action
在 action 目录下创建 Fibonacci.action 文件,内容如下:
int32 order
---
int32[] sequence
---
int32[] partial_sequence---上方:Goal,表示要计算的斐波那契数列的阶数
- 第一个
---下方:Result,表示计算完成的完整数列
- 第二个
---下方:Feedback,表示当前已计算的部分数列
1.3. 修改 CMakeLists.txt
在 CMakeLists.txt 中添加以下内容:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/Fibonacci.action"
)1.4. 修改 package.xml
在 package.xml 中添加依赖:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>1.5. 编译接口包
cd ~/ros2_ws
colcon build --packages-select custom_action_interfaces
source install/setup.bash2. 编写 Action 服务器
下面编写计算斐波那契数列的 Action 服务器。
在工作空间 src 目录下创建功能包 action_demo_py:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 action_demo_py在 action_demo_py/action_demo_py 目录下创建 fibonacci_action_server.py:
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from custom_action_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
# 创建 Action 服务器
# 参数:节点本身,Action 类型,Action 名称,执行回调函数
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback
)
self.get_logger().info('Fibonacci Action Server 已启动')
def execute_callback(self, goal_handle):
"""接收到 Goal 时执行的回调函数"""
self.get_logger().info('开始执行目标...')
# 1. 从 goal 中获取请求参数
order = goal_handle.request.order
# 2. 初始化反馈和结果对象
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
result_msg = Fibonacci.Result()
# 3. 执行计算,周期性发送反馈
for i in range(1, order):
# 检查客户端是否请求取消
if goal_handle.is_cancel_requested:
goal_handle.canceled()
self.get_logger().info('目标已被取消')
return result_msg
# 计算下一个斐波那契数
next_fib = feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
feedback_msg.partial_sequence.append(next_fib)
# 发布反馈
self.get_logger().info(f'反馈: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)
# 4. 任务完成,设置最终结果
result_msg.sequence = feedback_msg.partial_sequence
goal_handle.succeed()
self.get_logger().info(f'结果: {result_msg.sequence}')
return result_msg
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionServer()
rclpy.spin(node)
if __name__ == '__main__':
main()代码解析:
ActionServer需要四个参数:节点本身、Action 类型、Action 名称、执行回调函数
execute_callback是核心方法,在接收到 Goal 时被调用
goal_handle.is_cancel_requested用于检查客户端是否请求取消
goal_handle.publish_feedback()用于向客户端发送进度反馈
goal_handle.succeed()标记任务成功完成
3. 编写 Action 客户端
在 action_demo_py/action_demo_py 目录下创建 fibonacci_action_client.py:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from custom_action_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
# 创建 Action 客户端
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
# 1. 创建 Goal 消息
goal_msg = Fibonacci.Goal()
goal_msg.order = order
# 2. 等待服务器可用
self._action_client.wait_for_server()
# 3. 异步发送 Goal,并注册反馈回调
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
# 4. 为异步操作添加完成回调
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""处理服务器的响应(接受或拒绝 Goal)"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('目标被拒绝 :(')
return
self.get_logger().info('目标被接受 :)')
# 获取最终结果的 Future
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
"""处理最终结果"""
result = future.result().result
self.get_logger().info(f'最终结果: {result.sequence}')
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
"""处理周期性反馈"""
feedback = feedback_msg.feedback
self.get_logger().info(f'收到反馈: {feedback.partial_sequence}')
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionClient()
node.send_goal(10) # 计算 10 阶斐波那契数列
rclpy.spin(node)
if __name__ == '__main__':
main()代码解析:
ActionClient需要三个参数:节点本身、Action 类型、Action 名称
send_goal_async()异步发送 Goal,可传入feedback_callback接收反馈
goal_response_callback在服务器响应时触发(接受或拒绝)
get_result_async()在 Goal 被接受后,异步等待最终结果
feedback_callback每次收到反馈时触发
4. 配置 setup.py
修改 action_demo_py/setup.py,添加入口点:
entry_points={
'console_scripts': [
'fibonacci_action_server = action_demo_py.fibonacci_action_server:main',
'fibonacci_action_client = action_demo_py.fibonacci_action_client:main',
],
},5. 编译与运行
5.1. 编译
cd ~/ros2_ws
colcon build --packages-select action_demo_py
source install/setup.bash5.2. 运行服务器和客户端
在终端 1 中:
source install/setup.bash
ros2 run action_demo_py fibonacci_action_server在终端 2 中:
source install/setup.bash
ros2 run action_demo_py fibonacci_action_client6. 命令行工具
ROS 2 提供丰富的命令行工具调试 Action。
6.1. 查看所有 Action
ros2 action list6.2. 查看 Action 详细信息
ros2 action info /fibonacci6.3. 查看 Action 类型
ros2 action list -t6.4. 手动发送 Action 目标
ros2 action send_goal /fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"加上 --feedback 可以同时查看反馈:
ros2 action send_goal /fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}" --feedback