发布者_订阅者:Python 实现传感器节点

知识库
知识库文档
/tech-stacks/ros2/examples/发布者_订阅者:Python 实现传感器节点.md

文档

ROS 2 发布者/订阅者:Python 传感器节点

目标

用 Python 实现 ROS 2 节点:温度传感器发布者 + 数据处理订阅者。覆盖话题、QoS、launch 文件、参数配置。

前提

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src &;& cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --node-name temp_sensor temp_monitor
cd ~/ros2_ws &;& colcon build && source install/setup.bash

一、自定义消息接口

temp_monitor/msg/Temperature.msg

std_msgs/Header header   # 时间戳 + frame_id
float64 temperature       # 温度值 (℃)
float64 humidity          # 湿度 (%)
string sensor_id          # 传感器ID

temp_monitor/msg/TemperatureAlert.msg

std_msgs/Header header
float64 temperature
string level              # "normal" / "warning" / "critical"
string message

更新 package.xmlCMakeLists.txt(略),然后构建:

colcon build --packages-select temp_monitor
source install/setup.bash

二、温度发布者节点

#!/usr/bin/env python3
"""temp_sensor_node.py — 模拟温度传感器"""

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from temp_monitor.msg import Temperature
import random
import math
import time

class TempSensorNode(Node):
    def __init__(self):
        super().__init__('temp_sensor')

        # 声明参数(可运行时覆盖)
        self.declare_parameter('sensor_id', 'sensor_01')
        self.declare_parameter('publish_rate', 1.0)       # Hz
        self.declare_parameter('base_temp', 25.0)          # 基准温度
        self.declare_parameter('noise_std', 0.5)           # 噪声标准差

        self.sensor_id = self.get_parameter('sensor_id').value

        # QoS:传感器数据用 BEST_EFFORT + 保留最新
        qos = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )

        self.publisher = self.create_publisher(Temperature, 'sensor/temperature', qos)

        # 定时发布
        rate = self.get_parameter('publish_rate').value
        self.timer = self.create_timer(1.0 / rate, self.timer_callback)

        self.get_logger().info(f'🌡️ 传感器 {self.sensor_id} 已就绪 (发布频率: {rate}Hz)')

    def timer_callback(self):
        base = self.get_parameter('base_temp').value
        noise_std = self.get_parameter('noise_std').value

        # 模拟温度漂移(正弦波)
        drift = 3.0 * math.sin(time.time() * 0.1)

        msg = Temperature()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = self.sensor_id
        msg.temperature = base + drift + random.gauss(0, noise_std)
        msg.humidity = 50.0 + 10.0 * math.sin(time.time() * 0.05) + random.gauss(0, 2.0)
        msg.sensor_id = self.sensor_id

        self.publisher.publish(msg)
        self.get_logger().debug(f'发布: {msg.temperature:.2f}°C')

def main():
    rclpy.init()
    node = TempSensorNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

三、监控/告警订阅者节点

#!/usr/bin/env python3
"""temp_monitor_node.py — 温度监控与告警"""

import rclpy
from rclpy.node import Node
from temp_monitor.msg import Temperature, TemperatureAlert

THRESHOLD_WARN = 27.0
THRESHOLD_CRIT = 30.0

class TempMonitorNode(Node):
    def __init__(self):
        super().__init__('temp_monitor')
        self.subscription = self.create_subscription(
            Temperature, 'sensor/temperature', self.callback, 10)

        self.alert_pub = self.create_publisher(TemperatureAlert, 'sensor/alert', 10)
        self.get_logger().info('📊 温度监控节点已就绪')

    def callback(self, msg: Temperature):
        temp = msg.temperature
        sensor = msg.sensor_id

        # 判定告警级别
        if temp >= THRESHOLD_CRIT:
            level, log_fn = 'critical', self.get_logger().error
        elif temp >= THRESHOLD_WARN:
            level, log_fn = 'warning', self.get_logger().warn
        else:
            level, log_fn = 'normal', self.get_logger().info

        log_fn(f'[{sensor}] {temp:.2f}°C → {level}')

        # 发布告警
        alert = TemperatureAlert()
        alert.header.stamp = self.get_clock().now().to_msg()
        alert.temperature = temp
        alert.level = level
        alert.message = f'{sensor}: {temp:.2f}°C exceeds threshold'
        self.alert_pub.publish(alert)

def main():
    rclpy.init()
    node = TempMonitorNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

四、Launch 文件(一键启动)

# launch/temp_monitor.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    return LaunchDescription([
        DeclareLaunchArgument('sensor_id', default_value='sensor_01'),
        DeclareLaunchArgument('publish_rate', default_value='2.0'),

        Node(
            package='temp_monitor',
            executable='temp_sensor',
            name='temp_sensor',
            parameters=[{
                'sensor_id': LaunchConfiguration('sensor_id'),
                'publish_rate': LaunchConfiguration('publish_rate'),
                'base_temp': 25.0
            }]
        ),
        Node(
            package='temp_monitor',
            executable='temp_monitor',
            name='temp_monitor'
        ),
    ])

五、运行

# 构建
cd ~/ros2_ws &;& colcon build --packages-select temp_monitor
source install/setup.bash

# 启动
ros2 launch temp_monitor temp_monitor.launch.py

# 带参数启动
ros2 launch temp_monitor temp_monitor.launch.py sensor_id:=kitchen publish_rate:=5.0

# 命令行查询话题
ros2 topic list
ros2 topic echo /sensor/temperature
ros2 topic hz /sensor/temperature

关键点

  • ROS 2 节点是独立进程,通过 DDS 自动发现并通信
  • QoS BEST_EFFORT 适合传感器数据(容忍丢帧,追求实时)
  • 参数声明后可 ros2 param set 运行时修改
  • Launch 文件统一管理多节点启动和参数

信息

路径
/tech-stacks/ros2/examples/发布者_订阅者:Python 实现传感器节点.md
更新时间
2026/5/31