文档
MQTT Python 客户端:温度传感器发布与订阅
目标
使用 paho-mqtt 库实现一个模拟温度传感器的 Publisher 和监控终端的 Subscriber,理解 MQTT 发布/订阅模式、QoS 和遗嘱消息。
前提
# 启动本地 Broker
mosquitto -v -p 1883 &
# 安装 Python 库
pip install paho-mqtt
一、温度发布者(Sensor)
#!/usr/bin/env python3
"""mqtt_publisher.py — 模拟温度传感器,定期发布数据"""
import paho.mqtt.client as mqtt
import json
import time
import random
import platform
BROKER = "localhost"
PORT = 1883
TOPIC_TEMP = "home/sensor/temperature"
TOPIC_STATUS = "home/sensor/status"
CLIENT_ID = f"sensor_{platform.node()}"
# 遗嘱消息:传感器异常断线时 Broker 自动发布
WILL_TOPIC = "home/sensor/status"
WILL_MESSAGE = json.dumps({"status": "offline", "reason": "connection lost"})
def on_connect(client, userdata, flags, rc):
"""连接回调"""
status_codes = {
0: "连接成功",
1: "协议版本错误",
2: "Client ID 被拒",
3: "服务器不可用",
4: "用户名/密码错误",
5: "未授权"
}
print(f"[MQTT] {status_codes.get(rc, f'未知错误 {rc}')}")
# 发布上线状态
client.publish(TOPIC_STATUS, json.dumps({"status": "online"}), qos=1, retain=True)
def on_publish(client, userdata, mid):
"""发布完成回调(QoS 1/2 时有效)"""
print(f" ✅ 消息已确认 (mid={mid})")
def simulate_temperature():
"""模拟温度传感器读数"""
base = 25.0
drift = 5 * random.random() - 2.5
noise = random.gauss(0, 0.3)
return round(base + drift + noise, 2)
def main():
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5 # MQTT 5.0
)
# 设置遗嘱(MQTT 5)
client.will_set(WILL_TOPIC, WILL_MESSAGE, qos=1, retain=True)
# 绑定回调
client.on_connect = on_connect
client.on_publish = on_publish
# 连接 Broker
client.connect(BROKER, PORT, keepalive=60)
client.loop_start() # 后台网络线程
try:
count = 0
while True:
count += 1
temp = simulate_temperature()
payload = json.dumps({
"sensor_id": CLIENT_ID,
"temperature": temp,
"unit": "celsius",
"timestamp": int(time.time()),
"sequence": count
})
# 发布温度(QoS 1:至少一次)
info = client.publish(TOPIC_TEMP, payload, qos=1)
print(f"📡 发布 #{count}: {temp}°C | QoS=1 | mid={info.mid}")
time.sleep(2)
except KeyboardInterrupt:
print("\n🛑 传感器停止")
# 优雅下线
client.publish(TOPIC_STATUS, json.dumps({"status": "offline", "shutdown": True}),
qos=1, retain=True)
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
二、温度订阅者(Monitor)
#!/usr/bin/env python3
"""mqtt_subscriber.py — 温度监控终端"""
import paho.mqtt.client as mqtt
import json
BROKER = "localhost"
PORT = 1883
# 使用通配符订阅所有传感器
TOPIC_TEMP = "home/+/temperature" # + 单层通配符
TOPIC_STATUS = "home/+/status"
ALERT_THRESHOLD = 28.0 # 温度告警阈值
def on_connect(client, userdata, flags, rc):
print(f"[MQTT] 已连接到 Broker (rc={rc})")
client.subscribe([(TOPIC_TEMP, 1), (TOPIC_STATUS, 1)])
print(f"[MQTT] 订阅: {TOPIC_TEMP}, {TOPIC_STATUS}")
def on_message(client, userdata, msg):
"""消息回调"""
try:
data = json.loads(msg.payload)
except json.JSONDecodeError:
print(f"⚠️ 无法解析: {msg.topic} → {msg.payload}")
return
topic = msg.topic
if topic.endswith("/status"):
status = data.get("status", "unknown")
sensor = topic.split("/")[1]
emoji = "🟢" if status == "online" else "🔴"
print(f"{emoji} [{sensor}] 状态: {status}")
elif topic.endswith("/temperature"):
temp = data.get("temperature")
sensor = data.get("sensor_id", "unknown")
seq = data.get("sequence", "?")
alert = " ⚠️ 高温告警!" if temp and temp > ALERT_THRESHOLD else ""
temp_str = f"{temp}°C" if temp else "N/A"
print(f"🌡️ [{sensor}] #{seq} | {temp_str}{alert}")
def main():
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=60)
try:
print("📊 温度监控终端启动 (Ctrl+C 退出)")
client.loop_forever()
except KeyboardInterrupt:
print("\n🛑 监控停止")
client.disconnect()
if __name__ == "__main__":
main()
三、运行演示
# 终端 1:启动 Broker
mosquitto -v
# 终端 2:启动订阅者
python3 mqtt_subscriber.py
# 终端 3:启动发布者
python3 mqtt_publisher.py
# 终端 4(可选):CLI 工具查看
mosquitto_sub -t "home/#" -v
预期输出(订阅者端)
📊 温度监控终端启动
🟢 [sensor_desktop] 状态: online
🌡️ [sensor_desktop] #1 | 25.43°C
🌡️ [sensor_desktop] #2 | 26.87°C
🌡️ [sensor_desktop] #3 | 29.12°C ⚠️ 高温告警!
按 Ctrl+C 停止发布者后:
🔴 [sensor_desktop] 状态: offline → reason: connection lost
关键点
- QoS 1:保证消息到达但可能重复(温度场景容忍重复)
- 遗嘱消息:无需心跳超时等待,Broker 立刻通知订阅者设备离线
- Retain:状态主题用 retain,新订阅者立即可得最新上线状态
- 通配符:
home/+/temperature匹配所有房间的传感器