进阶:Service + Action 实现机器人抓取

知识库
知识库文档
/tech-stacks/ros2/examples/进阶:Service + Action 实现机器人抓取.md

文档

ROS 2 进阶:自定义 Service + Action 服务器实现机器人抓取

目标

实现 ROS 2 Service(同步请求/响应)和 Action(长耗时带反馈的任务),模拟机器人抓取操作。

一、自定义服务接口

# msg/MoveArm.srv — 机械臂移动到指定位置
geometry_msgs/Point target        # 请求:目标坐标 (x,y,z)
---
bool success                      # 响应:成功与否
string message                    # 响应:附加信息
# action/GraspObject.action — 抓取物体(可能耗时 5-10 秒)
string object_id                  # 目标物体 ID
---
bool success                      # 结果
int32 attempts                    # 尝试次数
---
float32 progress                  # 反馈(0.0-1.0)
string current_stage              # 当前阶段描述

二、Service 服务器

#!/usr/bin/env python3
"""arm_controller.py — 机械臂 Service 服务器"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from robot_msgs.srv import MoveArm
import math
import time

class ArmController(Node):
    def __init__(self):
        super().__init__('arm_controller')

        # 当前位置
        self.current_pos = Point(x=0.0, y=0.0, z=0.1)

        # 创建 Service
        self.srv = self.create_service(
            MoveArm, 'arm/move_to', self.move_callback)

        self.get_logger().info('🦾 机械臂控制器就绪')

    def move_callback(self, request, response):
        """处理移动请求"""
        target = request.target
        self.get_logger().info(f'收到移动指令: ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})')

        # 计算距离
        dx = target.x - self.current_pos.x
        dy = target.y - self.current_pos.y
        dz = target.z - self.current_pos.z
        distance = math.sqrt(dx*dx + dy*dy + dz*dz)

        # 验证工作空间(简化为球体半径 1.0m)
        if distance > 1.0:
            response.success = False
            response.message = f'目标超出工作空间 (距离={distance:.2f}m > 1.0m)'
            self.get_logger().warn(response.message)
            return response

        # 模拟移动(实际需硬件控制)
        self.get_logger().info(f'移动中... 距离 {distance:.2f}m')
        time.sleep(0.5)  # 模拟运动时间

        # 更新位置
        self.current_pos = target
        response.success = True
        response.message = f'已到达 ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})'
        self.get_logger().info(response.message)
        return response

def main():
    rclpy.init()
    node = ArmController()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

三、Action 服务器(长耗时抓取任务)

#!/usr/bin/env python3
"""grasp_server.py — 抓取 Action 服务器"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse
from rclpy.action.server import ServerGoalHandle
from robot_msgs.action import GraspObject
import time

class GraspServer(Node):
    def __init__(self):
        super().__init__('grasp_server')

        self._action_server = ActionServer(
            self,
            GraspObject,
            'grasp_object',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback
        )

        self.get_logger().info('🤖 抓取服务器就绪')

    def goal_callback(self, goal_request):
        """接收目标请求——可以在此拒绝"""
        obj_id = goal_request.object_id
        self.get_logger().info(f'收到抓取目标: {obj_id}')

        # 拒绝未知物体
        if obj_id.startswith('unknown'):
            self.get_logger().warn(f'拒绝: 未知物体 {obj_id}')
            return GoalResponse.REJECT
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle: ServerGoalHandle):
        """处理取消请求"""
        self.get_logger().info('收到取消请求')
        return rclpy.action.CancelResponse.ACCEPT

    def execute_callback(self, goal_handle: ServerGoalHandle):
        """执行抓取任务(长耗时)"""
        obj_id = goal_handle.request.object_id
        stages = [
            (0.0, 0.3, f'接近物体 {obj_id}'),
            (0.3, 0.7, f'闭合夹爪'),
            (0.7, 0.8, f'提升物体'),
            (0.8, 1.0, f'移动到放置位置')
        ]

        feedback_msg = GraspObject.Feedback()
        result = GraspObject.Result()
        attempts = 0

        for start_pct, end_pct, stage_desc in stages:
            # 检查取消
            if goal_handle.is_cancel_requested:
                result.success = False
                result.attempts = attempts
                goal_handle.canceled()
                self.get_logger().info('抓取已取消')
                return result

            self.get_logger().info(f'阶段: {stage_desc}')

            # 模拟阶段执行
            steps = 10
            for i in range(steps):
                progress = start_pct + (end_pct - start_pct) * (i / steps)
                feedback_msg.progress = progress
                feedback_msg.current_stage = stage_desc

                goal_handle.publish_feedback(feedback_msg)
                time.sleep(0.1)  # 100ms/step → 1 秒/阶段

            attempts += 1

        # 任务完成
        result.success = True
        result.attempts = attempts
        goal_handle.succeed()
        self.get_logger().info(f'✅ 抓取完成: {obj_id}')
        return result

def main():
    rclpy.init()
    node = GraspServer()
    rclpy.spin(node)

四、客户端调用

# Service 调用
ros2 service call /arm/move_to robot_msgs/srv/MoveArm \
    "{target: {x: 0.5, y: 0.2, z: 0.3}}"

# Action 调用
ros2 action send_goal /grasp_object robot_msgs/action/GraspObject \
    "{object_id: 'red_cube'}" \
    --feedback

# 输出:
# Feedback: progress=0.15, current_stage='接近物体 red_cube'
# Feedback: progress=0.45, current_stage='闭合夹爪'
# ...
# Result: success=True, attempts=4

五、关键点

  • Service vs Action:Service 适合快速请求/响应(<1 秒),Action 适合长耗时+需反馈+可取消的任务
  • Goal 回调:可在接受前验证目标(如超出工作空间则拒绝)
  • 反馈频率:建议 10-20Hz,过高浪费带宽
  • 取消语义:服务器主动检查 is_cancel_requested,不能完全依赖回调

信息

路径
/tech-stacks/ros2/examples/进阶:Service + Action 实现机器人抓取.md
更新时间
2026/5/31