ROS 2

技术栈
嵌入式 SDK
ros2机器人dds中间件分布式slam

概览

ROS 2 技术栈概览

ROS 2(Robot Operating System 2)是面向机器人开发的分布式通信与软件框架,由 Open Robotics 于 2017 年发布。基于 DDS(Data Distribution Service)中间件,ROS 2 解决了 ROS 1 的单点故障、非实时、跨平台差等痛点,支持从嵌入式 MCU 到云端服务器的全栈机器人开发。

解决什么问题

  • 分布式协调:多传感器、多执行器节点间可靠通信,自动发现与连接
  • 实时性需求:DDS 提供 QoS 策略,满足机器人控制闭环的高实时要求
  • 跨平台部署:同一套代码运行在 x86 工控机、ARM 边缘设备、MCU(micro-ROS)
  • 仿真与真机无缝切换:Gazebo/Ignition 仿真与物理机器人使用相同节点代码

关键特性

  • DDS 通信层:去中心化,无 Master 节点,支持自动发现
  • QoS 策略:可靠性、持久性、截止时间、优先级等 20+ 策略
  • 节点生命周期:managed nodes 支持状态机控制(unconfigured→inactive→active)
  • micro-ROS:面向 MCU 的轻量版,可在 FreeRTOS/Zephyr 上运行
  • 丰富工具链:RViz2 可视化、ros2bag 数据录制、rqt 调试、colcon 构建

安装

ROS 2 安装指南

1. 环境准备

平台 推荐版本
Ubuntu 22.04 LTS (ROS 2 Humble,LTS 至 2027)
Windows Windows 10/11(实验性支持)
macOS 仅开发/仿真用途(非官方 Tier 1 支持)

硬件建议

  • 开发机:8GB+ RAM,SSD
  • 机器人部署:NVIDIA Jetson / Raspberry Pi 4/5 / x86 工控机

依赖:Python 3.8+、CMake 3.8+、colcon(构建工具)

2. 安装步骤

Ubuntu 22.04 + ROS 2 Humble(推荐)

# 1. 设置 locale
sudo apt update &;& sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

# 2. 添加 ROS 2 apt 仓库
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update &;& sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg

echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list >; /dev/null

# 3. 安装 ROS 2
sudo apt update
sudo apt install ros-humble-desktop    # 含 RViz2 + Gazebo
# 或 ros-humble-ros-base(无 GUI 工具)

# 4. 环境设置
echo "source /opt/ros/humble/setup.bash" >;> ~/.bashrc
source ~/.bashrc

# 5. 安装 colcon 和 rosdep
sudo apt install python3-colcon-common-extensions python3-rosdep2
sudo rosdep init
rosdep update

验证安装

# 两个终端分别运行
ros2 run demo_nodes_cpp talker
ros2 run demo_nodes_py listener
# 应看到 Hello World 互相通信

Docker 方式(隔离环境)

docker pull osrf/ros:humble-desktop
docker run -it --rm --net=host -e DISPLAY=$DISPLAY osrf/ros:humble-desktop

3. 常见安装问题

问题 解决方案
rosdep init 失败 网络问题;配置代理或手动下载 20-default.list/etc/ros/rosdep/
colcon build 报错 检查 CMake 版本 ≥ 3.8;缺少依赖用 rosdep install -i --from-path src
RViz2 无法启动 检查 GPU 驱动和 OpenGL:`glxinfo
talker/listener 不通 检查是否在同一 ROS_DOMAIN_ID;检查防火墙
Python 包冲突 建议在虚拟环境或 Docker 中使用

示例

ROS 2 发布者/订阅者:Python 传感器节点

目标

用 Python 实现 ROS 2 节点:温度传感器发布者 + 数据处理订阅者。覆盖话题、QoS、launch 文件、参数配置。

前提

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src &;& cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --node-name temp_sensor temp_monitor
cd ~/ros2_ws &;& colcon build && source install/setup.bash

一、自定义消息接口

temp_monitor/msg/Temperature.msg

std_msgs/Header header   # 时间戳 + frame_id
float64 temperature       # 温度值 (℃)
float64 humidity          # 湿度 (%)
string sensor_id          # 传感器ID

temp_monitor/msg/TemperatureAlert.msg

std_msgs/Header header
float64 temperature
string level              # "normal" / "warning" / "critical"
string message

更新 package.xmlCMakeLists.txt(略),然后构建:

colcon build --packages-select temp_monitor
source install/setup.bash

二、温度发布者节点

#!/usr/bin/env python3
"""temp_sensor_node.py — 模拟温度传感器"""

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from temp_monitor.msg import Temperature
import random
import math
import time

class TempSensorNode(Node):
    def __init__(self):
        super().__init__('temp_sensor')

        # 声明参数(可运行时覆盖)
        self.declare_parameter('sensor_id', 'sensor_01')
        self.declare_parameter('publish_rate', 1.0)       # Hz
        self.declare_parameter('base_temp', 25.0)          # 基准温度
        self.declare_parameter('noise_std', 0.5)           # 噪声标准差

        self.sensor_id = self.get_parameter('sensor_id').value

        # QoS:传感器数据用 BEST_EFFORT + 保留最新
        qos = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )

        self.publisher = self.create_publisher(Temperature, 'sensor/temperature', qos)

        # 定时发布
        rate = self.get_parameter('publish_rate').value
        self.timer = self.create_timer(1.0 / rate, self.timer_callback)

        self.get_logger().info(f'🌡️ 传感器 {self.sensor_id} 已就绪 (发布频率: {rate}Hz)')

    def timer_callback(self):
        base = self.get_parameter('base_temp').value
        noise_std = self.get_parameter('noise_std').value

        # 模拟温度漂移(正弦波)
        drift = 3.0 * math.sin(time.time() * 0.1)

        msg = Temperature()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = self.sensor_id
        msg.temperature = base + drift + random.gauss(0, noise_std)
        msg.humidity = 50.0 + 10.0 * math.sin(time.time() * 0.05) + random.gauss(0, 2.0)
        msg.sensor_id = self.sensor_id

        self.publisher.publish(msg)
        self.get_logger().debug(f'发布: {msg.temperature:.2f}°C')

def main():
    rclpy.init()
    node = TempSensorNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

三、监控/告警订阅者节点

#!/usr/bin/env python3
"""temp_monitor_node.py — 温度监控与告警"""

import rclpy
from rclpy.node import Node
from temp_monitor.msg import Temperature, TemperatureAlert

THRESHOLD_WARN = 27.0
THRESHOLD_CRIT = 30.0

class TempMonitorNode(Node):
    def __init__(self):
        super().__init__('temp_monitor')
        self.subscription = self.create_subscription(
            Temperature, 'sensor/temperature', self.callback, 10)

        self.alert_pub = self.create_publisher(TemperatureAlert, 'sensor/alert', 10)
        self.get_logger().info('📊 温度监控节点已就绪')

    def callback(self, msg: Temperature):
        temp = msg.temperature
        sensor = msg.sensor_id

        # 判定告警级别
        if temp >= THRESHOLD_CRIT:
            level, log_fn = 'critical', self.get_logger().error
        elif temp >= THRESHOLD_WARN:
            level, log_fn = 'warning', self.get_logger().warn
        else:
            level, log_fn = 'normal', self.get_logger().info

        log_fn(f'[{sensor}] {temp:.2f}°C → {level}')

        # 发布告警
        alert = TemperatureAlert()
        alert.header.stamp = self.get_clock().now().to_msg()
        alert.temperature = temp
        alert.level = level
        alert.message = f'{sensor}: {temp:.2f}°C exceeds threshold'
        self.alert_pub.publish(alert)

def main():
    rclpy.init()
    node = TempMonitorNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

四、Launch 文件(一键启动)

# launch/temp_monitor.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    return LaunchDescription([
        DeclareLaunchArgument('sensor_id', default_value='sensor_01'),
        DeclareLaunchArgument('publish_rate', default_value='2.0'),

        Node(
            package='temp_monitor',
            executable='temp_sensor',
            name='temp_sensor',
            parameters=[{
                'sensor_id': LaunchConfiguration('sensor_id'),
                'publish_rate': LaunchConfiguration('publish_rate'),
                'base_temp': 25.0
            }]
        ),
        Node(
            package='temp_monitor',
            executable='temp_monitor',
            name='temp_monitor'
        ),
    ])

五、运行

# 构建
cd ~/ros2_ws &;& colcon build --packages-select temp_monitor
source install/setup.bash

# 启动
ros2 launch temp_monitor temp_monitor.launch.py

# 带参数启动
ros2 launch temp_monitor temp_monitor.launch.py sensor_id:=kitchen publish_rate:=5.0

# 命令行查询话题
ros2 topic list
ros2 topic echo /sensor/temperature
ros2 topic hz /sensor/temperature

关键点

  • ROS 2 节点是独立进程,通过 DDS 自动发现并通信
  • QoS BEST_EFFORT 适合传感器数据(容忍丢帧,追求实时)
  • 参数声明后可 ros2 param set 运行时修改
  • Launch 文件统一管理多节点启动和参数

ROS 2 进阶:自定义 Service + Action 服务器实现机器人抓取

目标

实现 ROS 2 Service(同步请求/响应)和 Action(长耗时带反馈的任务),模拟机器人抓取操作。

一、自定义服务接口

# msg/MoveArm.srv — 机械臂移动到指定位置
geometry_msgs/Point target        # 请求:目标坐标 (x,y,z)
---
bool success                      # 响应:成功与否
string message                    # 响应:附加信息
# action/GraspObject.action — 抓取物体(可能耗时 5-10 秒)
string object_id                  # 目标物体 ID
---
bool success                      # 结果
int32 attempts                    # 尝试次数
---
float32 progress                  # 反馈(0.0-1.0)
string current_stage              # 当前阶段描述

二、Service 服务器

#!/usr/bin/env python3
"""arm_controller.py — 机械臂 Service 服务器"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from robot_msgs.srv import MoveArm
import math
import time

class ArmController(Node):
    def __init__(self):
        super().__init__('arm_controller')

        # 当前位置
        self.current_pos = Point(x=0.0, y=0.0, z=0.1)

        # 创建 Service
        self.srv = self.create_service(
            MoveArm, 'arm/move_to', self.move_callback)

        self.get_logger().info('🦾 机械臂控制器就绪')

    def move_callback(self, request, response):
        """处理移动请求"""
        target = request.target
        self.get_logger().info(f'收到移动指令: ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})')

        # 计算距离
        dx = target.x - self.current_pos.x
        dy = target.y - self.current_pos.y
        dz = target.z - self.current_pos.z
        distance = math.sqrt(dx*dx + dy*dy + dz*dz)

        # 验证工作空间(简化为球体半径 1.0m)
        if distance > 1.0:
            response.success = False
            response.message = f'目标超出工作空间 (距离={distance:.2f}m > 1.0m)'
            self.get_logger().warn(response.message)
            return response

        # 模拟移动(实际需硬件控制)
        self.get_logger().info(f'移动中... 距离 {distance:.2f}m')
        time.sleep(0.5)  # 模拟运动时间

        # 更新位置
        self.current_pos = target
        response.success = True
        response.message = f'已到达 ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})'
        self.get_logger().info(response.message)
        return response

def main():
    rclpy.init()
    node = ArmController()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

三、Action 服务器(长耗时抓取任务)

#!/usr/bin/env python3
"""grasp_server.py — 抓取 Action 服务器"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse
from rclpy.action.server import ServerGoalHandle
from robot_msgs.action import GraspObject
import time

class GraspServer(Node):
    def __init__(self):
        super().__init__('grasp_server')

        self._action_server = ActionServer(
            self,
            GraspObject,
            'grasp_object',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback
        )

        self.get_logger().info('🤖 抓取服务器就绪')

    def goal_callback(self, goal_request):
        """接收目标请求——可以在此拒绝"""
        obj_id = goal_request.object_id
        self.get_logger().info(f'收到抓取目标: {obj_id}')

        # 拒绝未知物体
        if obj_id.startswith('unknown'):
            self.get_logger().warn(f'拒绝: 未知物体 {obj_id}')
            return GoalResponse.REJECT
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle: ServerGoalHandle):
        """处理取消请求"""
        self.get_logger().info('收到取消请求')
        return rclpy.action.CancelResponse.ACCEPT

    def execute_callback(self, goal_handle: ServerGoalHandle):
        """执行抓取任务(长耗时)"""
        obj_id = goal_handle.request.object_id
        stages = [
            (0.0, 0.3, f'接近物体 {obj_id}'),
            (0.3, 0.7, f'闭合夹爪'),
            (0.7, 0.8, f'提升物体'),
            (0.8, 1.0, f'移动到放置位置')
        ]

        feedback_msg = GraspObject.Feedback()
        result = GraspObject.Result()
        attempts = 0

        for start_pct, end_pct, stage_desc in stages:
            # 检查取消
            if goal_handle.is_cancel_requested:
                result.success = False
                result.attempts = attempts
                goal_handle.canceled()
                self.get_logger().info('抓取已取消')
                return result

            self.get_logger().info(f'阶段: {stage_desc}')

            # 模拟阶段执行
            steps = 10
            for i in range(steps):
                progress = start_pct + (end_pct - start_pct) * (i / steps)
                feedback_msg.progress = progress
                feedback_msg.current_stage = stage_desc

                goal_handle.publish_feedback(feedback_msg)
                time.sleep(0.1)  # 100ms/step → 1 秒/阶段

            attempts += 1

        # 任务完成
        result.success = True
        result.attempts = attempts
        goal_handle.succeed()
        self.get_logger().info(f'✅ 抓取完成: {obj_id}')
        return result

def main():
    rclpy.init()
    node = GraspServer()
    rclpy.spin(node)

四、客户端调用

# Service 调用
ros2 service call /arm/move_to robot_msgs/srv/MoveArm \
    "{target: {x: 0.5, y: 0.2, z: 0.3}}"

# Action 调用
ros2 action send_goal /grasp_object robot_msgs/action/GraspObject \
    "{object_id: 'red_cube'}" \
    --feedback

# 输出:
# Feedback: progress=0.15, current_stage='接近物体 red_cube'
# Feedback: progress=0.45, current_stage='闭合夹爪'
# ...
# Result: success=True, attempts=4

五、关键点

  • Service vs Action:Service 适合快速请求/响应(<1 秒),Action 适合长耗时+需反馈+可取消的任务
  • Goal 回调:可在接受前验证目标(如超出工作空间则拒绝)
  • 反馈频率:建议 10-20Hz,过高浪费带宽
  • 取消语义:服务器主动检查 is_cancel_requested,不能完全依赖回调

教程

ROS 2 机器人开发指南

本章目标

掌握 ROS 2 在机器人项目中的核心概念和实践模式,走通从仿真到真机部署的完整流程。


1. DDS 基础

ROS 2 的核心通信层基于 DDS,这是与 ROS 1 最大的架构变化。

DDS 带来的好处

  • 去中心化:无 Master 节点,每个节点平等
  • 自动发现:节点加入网络后自动互相发现
  • QoS 可配置:每个 Topic 独立策略

常用 DDS 实现

实现 供应商 适用场景
Fast DDS eProsima ROS 2 默认,通用
Cyclone DDS Eclipse 嵌入式 + 云端混合
RTI Connext RTI 安全认证(需要许可证)

2. ROS 2 核心概念图谱

工作空间 (workspace)
  └── 包 (package)
       ├── 节点 (node)         ← 最小执行单元
       ├── 话题 (topic)        ← 发布/订阅通信
       ├── 服务 (service)      ← 同步请求/响应
       ├── 动作 (action)       ← 异步长任务
       └── 参数 (parameter)    ← 运行时配置

Launch 文件 → 批量启动节点 + 参数配置
URDF/Xacro → 机器人模型描述
TF2       → 坐标系变换

3. 典型机器人软件架构

感知层 (Perception)
  ├── /camera/image_raw       (摄像头)
  ├── /lidar/scan             (激光雷达)
  └── /imu/data               (IMU)

决策层 (Planning)
  ├── /local_planner          (局部路径规划)
  ├── /global_planner         (全局路径规划)
  └── /behavior_tree          (行为树)

控制层 (Control)
  ├── /cmd_vel                 (速度指令)
  └── /motor_controller       (电机驱动)

状态层 (State)
  ├── /odom                    (里程计)
  └── /joint_states            (关节状态)

4. Gazebo 仿真集成

<!-- launch/sim.launch.py 核心结构 -->
<launch>
    <!-- 启动 Gazebo -->
    <include file="$(find-pkg-share gazebo_ros)/launch/gazebo.launch.py"/>

    <!-- 生成机器人模型 -->
    <node pkg="gazebo_ros" exec="spawn_entity.py"
          args="-entity my_robot -file $(find-pkg-share my_robot)/urdf/robot.urdf"/>

    <!-- 启动控制器 -->
    <node pkg="controller_manager" exec="ros2_control_node"/>
</launch>

5. 调试工具

# 查看系统拓扑
rqt_graph

# 查看话题数据
ros2 topic echo /cmd_vel
ros2 topic hz /scan     # 话题频率

# 查看节点信息
ros2 node info /controller

# 录制与回放
ros2 bag record -a       # 录制所有
ros2 bag play bag_name/  # 回放

# 参数操作
ros2 param list
ros2 param set /node param value

6. 真机部署注意事项

关注点 建议
网络 工控机 + 开发机同一子网,关闭防火墙
实时性 控制循环用 RT kernel 或 Xenomai
启动 systemd service 自动启动 ros2 launch
日志 ros2 run --log-level DEBUG 调试时
资源 htop 监控 CPU/内存,节点太多考虑合并

思考题

  1. 为什么 ROS 2 选择 DDS 而不是继续用 TCPROS?
  2. Action 的回调函数在哪个线程执行?
  3. 如何设计一个多机器人的 ROS 2 系统?