ROS 2 发布者/订阅者:Python 传感器节点
目标
用 Python 实现 ROS 2 节点:温度传感器发布者 + 数据处理订阅者。覆盖话题、QoS、launch 文件、参数配置。
前提
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src &
ros2 pkg create --build-type ament_python --node-name temp_sensor temp_monitor
cd ~/ros2_ws &
一、自定义消息接口
temp_monitor/msg/Temperature.msg:
std_msgs/Header header # 时间戳 + frame_id
float64 temperature # 温度值 (℃)
float64 humidity # 湿度 (%)
string sensor_id # 传感器ID
temp_monitor/msg/TemperatureAlert.msg:
std_msgs/Header header
float64 temperature
string level # "normal" / "warning" / "critical"
string message
更新 package.xml 和 CMakeLists.txt(略),然后构建:
colcon build --packages-select temp_monitor
source install/setup.bash
二、温度发布者节点
#!/usr/bin/env python3
"""temp_sensor_node.py — 模拟温度传感器"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from temp_monitor.msg import Temperature
import random
import math
import time
class TempSensorNode(Node):
def __init__(self):
super().__init__('temp_sensor')
# 声明参数(可运行时覆盖)
self.declare_parameter('sensor_id', 'sensor_01')
self.declare_parameter('publish_rate', 1.0) # Hz
self.declare_parameter('base_temp', 25.0) # 基准温度
self.declare_parameter('noise_std', 0.5) # 噪声标准差
self.sensor_id = self.get_parameter('sensor_id').value
# QoS:传感器数据用 BEST_EFFORT + 保留最新
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
self.publisher = self.create_publisher(Temperature, 'sensor/temperature', qos)
# 定时发布
rate = self.get_parameter('publish_rate').value
self.timer = self.create_timer(1.0 / rate, self.timer_callback)
self.get_logger().info(f'🌡️ 传感器 {self.sensor_id} 已就绪 (发布频率: {rate}Hz)')
def timer_callback(self):
base = self.get_parameter('base_temp').value
noise_std = self.get_parameter('noise_std').value
# 模拟温度漂移(正弦波)
drift = 3.0 * math.sin(time.time() * 0.1)
msg = Temperature()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = self.sensor_id
msg.temperature = base + drift + random.gauss(0, noise_std)
msg.humidity = 50.0 + 10.0 * math.sin(time.time() * 0.05) + random.gauss(0, 2.0)
msg.sensor_id = self.sensor_id
self.publisher.publish(msg)
self.get_logger().debug(f'发布: {msg.temperature:.2f}°C')
def main():
rclpy.init()
node = TempSensorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
三、监控/告警订阅者节点
#!/usr/bin/env python3
"""temp_monitor_node.py — 温度监控与告警"""
import rclpy
from rclpy.node import Node
from temp_monitor.msg import Temperature, TemperatureAlert
THRESHOLD_WARN = 27.0
THRESHOLD_CRIT = 30.0
class TempMonitorNode(Node):
def __init__(self):
super().__init__('temp_monitor')
self.subscription = self.create_subscription(
Temperature, 'sensor/temperature', self.callback, 10)
self.alert_pub = self.create_publisher(TemperatureAlert, 'sensor/alert', 10)
self.get_logger().info('📊 温度监控节点已就绪')
def callback(self, msg: Temperature):
temp = msg.temperature
sensor = msg.sensor_id
# 判定告警级别
if temp >= THRESHOLD_CRIT:
level, log_fn = 'critical', self.get_logger().error
elif temp >= THRESHOLD_WARN:
level, log_fn = 'warning', self.get_logger().warn
else:
level, log_fn = 'normal', self.get_logger().info
log_fn(f'[{sensor}] {temp:.2f}°C → {level}')
# 发布告警
alert = TemperatureAlert()
alert.header.stamp = self.get_clock().now().to_msg()
alert.temperature = temp
alert.level = level
alert.message = f'{sensor}: {temp:.2f}°C exceeds threshold'
self.alert_pub.publish(alert)
def main():
rclpy.init()
node = TempMonitorNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
四、Launch 文件(一键启动)
# launch/temp_monitor.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
return LaunchDescription([
DeclareLaunchArgument('sensor_id', default_value='sensor_01'),
DeclareLaunchArgument('publish_rate', default_value='2.0'),
Node(
package='temp_monitor',
executable='temp_sensor',
name='temp_sensor',
parameters=[{
'sensor_id': LaunchConfiguration('sensor_id'),
'publish_rate': LaunchConfiguration('publish_rate'),
'base_temp': 25.0
}]
),
Node(
package='temp_monitor',
executable='temp_monitor',
name='temp_monitor'
),
])
五、运行
cd ~/ros2_ws &
source install/setup.bash
ros2 launch temp_monitor temp_monitor.launch.py
ros2 launch temp_monitor temp_monitor.launch.py sensor_id:=kitchen publish_rate:=5.0
ros2 topic list
ros2 topic echo /sensor/temperature
ros2 topic hz /sensor/temperature
关键点
- ROS 2 节点是独立进程,通过 DDS 自动发现并通信
- QoS BEST_EFFORT 适合传感器数据(容忍丢帧,追求实时)
- 参数声明后可
ros2 param set 运行时修改
- Launch 文件统一管理多节点启动和参数
ROS 2 进阶:自定义 Service + Action 服务器实现机器人抓取
目标
实现 ROS 2 Service(同步请求/响应)和 Action(长耗时带反馈的任务),模拟机器人抓取操作。
一、自定义服务接口
# msg/MoveArm.srv — 机械臂移动到指定位置
geometry_msgs/Point target # 请求:目标坐标 (x,y,z)
---
bool success # 响应:成功与否
string message # 响应:附加信息
# action/GraspObject.action — 抓取物体(可能耗时 5-10 秒)
string object_id # 目标物体 ID
---
bool success # 结果
int32 attempts # 尝试次数
---
float32 progress # 反馈(0.0-1.0)
string current_stage # 当前阶段描述
二、Service 服务器
#!/usr/bin/env python3
"""arm_controller.py — 机械臂 Service 服务器"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from robot_msgs.srv import MoveArm
import math
import time
class ArmController(Node):
def __init__(self):
super().__init__('arm_controller')
# 当前位置
self.current_pos = Point(x=0.0, y=0.0, z=0.1)
# 创建 Service
self.srv = self.create_service(
MoveArm, 'arm/move_to', self.move_callback)
self.get_logger().info('🦾 机械臂控制器就绪')
def move_callback(self, request, response):
"""处理移动请求"""
target = request.target
self.get_logger().info(f'收到移动指令: ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})')
# 计算距离
dx = target.x - self.current_pos.x
dy = target.y - self.current_pos.y
dz = target.z - self.current_pos.z
distance = math.sqrt(dx*dx + dy*dy + dz*dz)
# 验证工作空间(简化为球体半径 1.0m)
if distance > 1.0:
response.success = False
response.message = f'目标超出工作空间 (距离={distance:.2f}m > 1.0m)'
self.get_logger().warn(response.message)
return response
# 模拟移动(实际需硬件控制)
self.get_logger().info(f'移动中... 距离 {distance:.2f}m')
time.sleep(0.5) # 模拟运动时间
# 更新位置
self.current_pos = target
response.success = True
response.message = f'已到达 ({target.x:.2f}, {target.y:.2f}, {target.z:.2f})'
self.get_logger().info(response.message)
return response
def main():
rclpy.init()
node = ArmController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
三、Action 服务器(长耗时抓取任务)
#!/usr/bin/env python3
"""grasp_server.py — 抓取 Action 服务器"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse
from rclpy.action.server import ServerGoalHandle
from robot_msgs.action import GraspObject
import time
class GraspServer(Node):
def __init__(self):
super().__init__('grasp_server')
self._action_server = ActionServer(
self,
GraspObject,
'grasp_object',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback
)
self.get_logger().info('🤖 抓取服务器就绪')
def goal_callback(self, goal_request):
"""接收目标请求——可以在此拒绝"""
obj_id = goal_request.object_id
self.get_logger().info(f'收到抓取目标: {obj_id}')
# 拒绝未知物体
if obj_id.startswith('unknown'):
self.get_logger().warn(f'拒绝: 未知物体 {obj_id}')
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle: ServerGoalHandle):
"""处理取消请求"""
self.get_logger().info('收到取消请求')
return rclpy.action.CancelResponse.ACCEPT
def execute_callback(self, goal_handle: ServerGoalHandle):
"""执行抓取任务(长耗时)"""
obj_id = goal_handle.request.object_id
stages = [
(0.0, 0.3, f'接近物体 {obj_id}'),
(0.3, 0.7, f'闭合夹爪'),
(0.7, 0.8, f'提升物体'),
(0.8, 1.0, f'移动到放置位置')
]
feedback_msg = GraspObject.Feedback()
result = GraspObject.Result()
attempts = 0
for start_pct, end_pct, stage_desc in stages:
# 检查取消
if goal_handle.is_cancel_requested:
result.success = False
result.attempts = attempts
goal_handle.canceled()
self.get_logger().info('抓取已取消')
return result
self.get_logger().info(f'阶段: {stage_desc}')
# 模拟阶段执行
steps = 10
for i in range(steps):
progress = start_pct + (end_pct - start_pct) * (i / steps)
feedback_msg.progress = progress
feedback_msg.current_stage = stage_desc
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.1) # 100ms/step → 1 秒/阶段
attempts += 1
# 任务完成
result.success = True
result.attempts = attempts
goal_handle.succeed()
self.get_logger().info(f'✅ 抓取完成: {obj_id}')
return result
def main():
rclpy.init()
node = GraspServer()
rclpy.spin(node)
四、客户端调用
ros2 service call /arm/move_to robot_msgs/srv/MoveArm \
"{target: {x: 0.5, y: 0.2, z: 0.3}}"
ros2 action send_goal /grasp_object robot_msgs/action/GraspObject \
"{object_id: 'red_cube'}" \
--feedback
五、关键点
- Service vs Action:Service 适合快速请求/响应(<1 秒),Action 适合长耗时+需反馈+可取消的任务
- Goal 回调:可在接受前验证目标(如超出工作空间则拒绝)
- 反馈频率:建议 10-20Hz,过高浪费带宽
- 取消语义:服务器主动检查
is_cancel_requested,不能完全依赖回调