文档
ROS 2 发布者/订阅者:Python 传感器节点
目标
用 Python 实现 ROS 2 节点:温度传感器发布者 + 数据处理订阅者。覆盖话题、QoS、launch 文件、参数配置。
前提
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src && cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --node-name temp_sensor temp_monitor
cd ~/ros2_ws && colcon build && source install/setup.bash
一、自定义消息接口
temp_monitor/msg/Temperature.msg:
std_msgs/Header header # 时间戳 + frame_id
float64 temperature # 温度值 (℃)
float64 humidity # 湿度 (%)
string sensor_id # 传感器ID
temp_monitor/msg/TemperatureAlert.msg:
std_msgs/Header header
float64 temperature
string level # "normal" / "warning" / "critical"
string message
更新 package.xml 和 CMakeLists.txt(略),然后构建:
colcon build --packages-select temp_monitor
source install/setup.bash
二、温度发布者节点
#!/usr/bin/env python3
"""temp_sensor_node.py — 模拟温度传感器"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from temp_monitor.msg import Temperature
import random
import math
import time
class TempSensorNode(Node):
def __init__(self):
super().__init__('temp_sensor')
# 声明参数(可运行时覆盖)
self.declare_parameter('sensor_id', 'sensor_01')
self.declare_parameter('publish_rate', 1.0) # Hz
self.declare_parameter('base_temp', 25.0) # 基准温度
self.declare_parameter('noise_std', 0.5) # 噪声标准差
self.sensor_id = self.get_parameter('sensor_id').value
# QoS:传感器数据用 BEST_EFFORT + 保留最新
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
self.publisher = self.create_publisher(Temperature, 'sensor/temperature', qos)
# 定时发布
rate = self.get_parameter('publish_rate').value
self.timer = self.create_timer(1.0 / rate, self.timer_callback)
self.get_logger().info(f'🌡️ 传感器 {self.sensor_id} 已就绪 (发布频率: {rate}Hz)')
def timer_callback(self):
base = self.get_parameter('base_temp').value
noise_std = self.get_parameter('noise_std').value
# 模拟温度漂移(正弦波)
drift = 3.0 * math.sin(time.time() * 0.1)
msg = Temperature()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = self.sensor_id
msg.temperature = base + drift + random.gauss(0, noise_std)
msg.humidity = 50.0 + 10.0 * math.sin(time.time() * 0.05) + random.gauss(0, 2.0)
msg.sensor_id = self.sensor_id
self.publisher.publish(msg)
self.get_logger().debug(f'发布: {msg.temperature:.2f}°C')
def main():
rclpy.init()
node = TempSensorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
三、监控/告警订阅者节点
#!/usr/bin/env python3
"""temp_monitor_node.py — 温度监控与告警"""
import rclpy
from rclpy.node import Node
from temp_monitor.msg import Temperature, TemperatureAlert
THRESHOLD_WARN = 27.0
THRESHOLD_CRIT = 30.0
class TempMonitorNode(Node):
def __init__(self):
super().__init__('temp_monitor')
self.subscription = self.create_subscription(
Temperature, 'sensor/temperature', self.callback, 10)
self.alert_pub = self.create_publisher(TemperatureAlert, 'sensor/alert', 10)
self.get_logger().info('📊 温度监控节点已就绪')
def callback(self, msg: Temperature):
temp = msg.temperature
sensor = msg.sensor_id
# 判定告警级别
if temp >= THRESHOLD_CRIT:
level, log_fn = 'critical', self.get_logger().error
elif temp >= THRESHOLD_WARN:
level, log_fn = 'warning', self.get_logger().warn
else:
level, log_fn = 'normal', self.get_logger().info
log_fn(f'[{sensor}] {temp:.2f}°C → {level}')
# 发布告警
alert = TemperatureAlert()
alert.header.stamp = self.get_clock().now().to_msg()
alert.temperature = temp
alert.level = level
alert.message = f'{sensor}: {temp:.2f}°C exceeds threshold'
self.alert_pub.publish(alert)
def main():
rclpy.init()
node = TempMonitorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
四、Launch 文件(一键启动)
# launch/temp_monitor.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
return LaunchDescription([
DeclareLaunchArgument('sensor_id', default_value='sensor_01'),
DeclareLaunchArgument('publish_rate', default_value='2.0'),
Node(
package='temp_monitor',
executable='temp_sensor',
name='temp_sensor',
parameters=[{
'sensor_id': LaunchConfiguration('sensor_id'),
'publish_rate': LaunchConfiguration('publish_rate'),
'base_temp': 25.0
}]
),
Node(
package='temp_monitor',
executable='temp_monitor',
name='temp_monitor'
),
])
五、运行
# 构建
cd ~/ros2_ws && colcon build --packages-select temp_monitor
source install/setup.bash
# 启动
ros2 launch temp_monitor temp_monitor.launch.py
# 带参数启动
ros2 launch temp_monitor temp_monitor.launch.py sensor_id:=kitchen publish_rate:=5.0
# 命令行查询话题
ros2 topic list
ros2 topic echo /sensor/temperature
ros2 topic hz /sensor/temperature
关键点
- ROS 2 节点是独立进程,通过 DDS 自动发现并通信
- QoS BEST_EFFORT 适合传感器数据(容忍丢帧,追求实时)
- 参数声明后可
ros2 param set运行时修改 - Launch 文件统一管理多节点启动和参数