MQTT 协议进阶:QoS 深度解析与安全通信(TLS)
目标
深入理解 MQTT QoS 机制的消息流程,配置 TLS/SSL 加密通信,使用 MQTT 5.0 新特性。
一、QoS 深度解析
QoS 0 — 至多一次 (Fire and Forget)
发布者 → PUBLISH → Broker → PUBLISH → 订阅者
(不等待确认) (不重试)
client.publish("sensor/temp", "25.5", qos=0)
# 优点:最快、开销最小
# 适用:高频传感器数据,容忍丢帧(如每秒 100 次的加速度采样)
QoS 1 — 至少一次 (Acknowledged)
发布者 → PUBLISH (Packet ID=1234)
← PUBACK (Packet ID=1234)
Broker → PUBLISH (Packet ID=5678)
← PUBACK (Packet ID=5678) 订阅者
# 未收到 PUBACK 则重发
def on_publish(client, userdata, mid):
print(f"QoS 1 消息 {mid} 已确认")
client.on_publish = on_publish
client.publish("alert/fire", "HIGH", qos=1)
# 优点:可靠投递
# 缺点:可能重复(订阅者需幂等处理)
QoS 2 — 恰好一次 (Four-Way Handshake)
发布者 → PUBLISH (ID=1)
← PUBREC (ID=1)
发布者 → PUBREL (ID=1)
← PUBCOMP (ID=1)
# 四步握手保证「恰好一次」
client.publish("payment/confirmed", json.dumps(order), qos=2)
# 优点:不丢不重
# 缺点:最慢,适合金融/控制指令
# 注意:MQTT 5.0 优化了 QoS 2 握手流程
QoS 选择建议
| 场景 |
推荐 QoS |
原因 |
| 温度传感器(1Hz) |
QoS 1 |
偶尔重复可接受 |
| 加速度计(100Hz) |
QoS 0 |
速度优先 |
| 支付确认 |
QoS 2 |
零容忍重复 |
| 设备控制命令 |
QoS 2 |
必须恰好执行一次 |
| 状态更新 (Retained) |
QoS 1 |
新订阅者需最新状态 |
二、TLS/SSL 安全通信
生成自签名证书
openssl genrsa -out ca.key 2048
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
-subj "/C=CN/ST=Guangdong/O=IoT Company/CN=My IoT CA"
openssl genrsa -out broker.key 2048
openssl req -new -key broker.key -out broker.csr \
-subj "/CN=mqtt.local"
openssl x509 -req -in broker.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out broker.crt -days 365
Mosquitto TLS 配置
listener 8883
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/broker.crt
keyfile /etc/mosquitto/certs/broker.key
require_certificate true
tls_version tlsv1.2
listener 1883 localhost
Python 客户端 TLS
import ssl
import paho.mqtt.client as mqtt
client = mqtt.Client(client_id="secure_sensor")
# 单向 TLS(验证 Broker)
client.tls_set(
ca_certs="ca.crt", # CA 证书
certfile=None, # 无客户端证书
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2
)
# 双向 TLS(客户端也出示证书)
# client.tls_set(
# ca_certs="ca.crt",
# certfile="client.crt",
# keyfile="client.key",
# cert_reqs=ssl.CERT_REQUIRED
# )
client.connect("mqtt.local", 8883)
三、MQTT 5.0 新特性实战
# MQTT 5.0 特性示例
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client(protocol=mqtt.MQTTv5)
# ── 会话过期 ──
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600 # 断线后保留会话 1 小时
client.connect("localhost", properties=props)
# ── 消息过期 ──
pub_props = Properties(PacketTypes.PUBLISH)
pub_props.MessageExpiryInterval = 60 # 消息 60 秒后自动丢弃
client.publish("temp/urgent", "42°C", qos=1, properties=pub_props)
# ── 请求/响应模式 ──
# 发布者
response_props = Properties(PacketTypes.PUBLISH)
response_props.ResponseTopic = "cmd/response/xyz"
response_props.CorrelationData = b"req-001"
client.publish("cmd/restart", properties=response_props)
# ── 用户属性(自定义元数据) ──
meta_props = Properties(PacketTypes.PUBLISH)
meta_props.UserProperty = [
("sensor_type", "temperature"),
("firmware_version", "2.4.1"),
("priority", "high")
]
client.publish("data", "25.0", properties=meta_props)
四、关键点
- QoS 降级:发布者 QoS 2 → Broker 可为订阅者降为 QoS 1 或 0(取决于订阅 QoS)
- TLS 开销:ESP32 上 TLS 握手约 1-2 秒,需考虑电池功耗
- MQTT 5.0 vs 3.1.1:5.0 新增 30+ 特性但 Broker 兼容成本高,嵌入式首选 3.1.1
- 会话保持:
clean_start=false + SessionExpiryInterval 确保断线后消息不丢失
MQTT Python 客户端:温度传感器发布与订阅
目标
使用 paho-mqtt 库实现一个模拟温度传感器的 Publisher 和监控终端的 Subscriber,理解 MQTT 发布/订阅模式、QoS 和遗嘱消息。
前提
mosquitto -v -p 1883 &
pip install paho-mqtt
一、温度发布者(Sensor)
#!/usr/bin/env python3
"""mqtt_publisher.py — 模拟温度传感器,定期发布数据"""
import paho.mqtt.client as mqtt
import json
import time
import random
import platform
BROKER = "localhost"
PORT = 1883
TOPIC_TEMP = "home/sensor/temperature"
TOPIC_STATUS = "home/sensor/status"
CLIENT_ID = f"sensor_{platform.node()}"
# 遗嘱消息:传感器异常断线时 Broker 自动发布
WILL_TOPIC = "home/sensor/status"
WILL_MESSAGE = json.dumps({"status": "offline", "reason": "connection lost"})
def on_connect(client, userdata, flags, rc):
"""连接回调"""
status_codes = {
0: "连接成功",
1: "协议版本错误",
2: "Client ID 被拒",
3: "服务器不可用",
4: "用户名/密码错误",
5: "未授权"
}
print(f"[MQTT] {status_codes.get(rc, f'未知错误 {rc}')}")
# 发布上线状态
client.publish(TOPIC_STATUS, json.dumps({"status": "online"}), qos=1, retain=True)
def on_publish(client, userdata, mid):
"""发布完成回调(QoS 1/2 时有效)"""
print(f" ✅ 消息已确认 (mid={mid})")
def simulate_temperature():
"""模拟温度传感器读数"""
base = 25.0
drift = 5 * random.random() - 2.5
noise = random.gauss(0, 0.3)
return round(base + drift + noise, 2)
def main():
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5 # MQTT 5.0
)
# 设置遗嘱(MQTT 5)
client.will_set(WILL_TOPIC, WILL_MESSAGE, qos=1, retain=True)
# 绑定回调
client.on_connect = on_connect
client.on_publish = on_publish
# 连接 Broker
client.connect(BROKER, PORT, keepalive=60)
client.loop_start() # 后台网络线程
try:
count = 0
while True:
count += 1
temp = simulate_temperature()
payload = json.dumps({
"sensor_id": CLIENT_ID,
"temperature": temp,
"unit": "celsius",
"timestamp": int(time.time()),
"sequence": count
})
# 发布温度(QoS 1:至少一次)
info = client.publish(TOPIC_TEMP, payload, qos=1)
print(f"📡 发布 #{count}: {temp}°C | QoS=1 | mid={info.mid}")
time.sleep(2)
except KeyboardInterrupt:
print("\n🛑 传感器停止")
# 优雅下线
client.publish(TOPIC_STATUS, json.dumps({"status": "offline", "shutdown": True}),
qos=1, retain=True)
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
二、温度订阅者(Monitor)
#!/usr/bin/env python3
"""mqtt_subscriber.py — 温度监控终端"""
import paho.mqtt.client as mqtt
import json
BROKER = "localhost"
PORT = 1883
# 使用通配符订阅所有传感器
TOPIC_TEMP = "home/+/temperature" # + 单层通配符
TOPIC_STATUS = "home/+/status"
ALERT_THRESHOLD = 28.0 # 温度告警阈值
def on_connect(client, userdata, flags, rc):
print(f"[MQTT] 已连接到 Broker (rc={rc})")
client.subscribe([(TOPIC_TEMP, 1), (TOPIC_STATUS, 1)])
print(f"[MQTT] 订阅: {TOPIC_TEMP}, {TOPIC_STATUS}")
def on_message(client, userdata, msg):
"""消息回调"""
try:
data = json.loads(msg.payload)
except json.JSONDecodeError:
print(f"⚠️ 无法解析: {msg.topic} → {msg.payload}")
return
topic = msg.topic
if topic.endswith("/status"):
status = data.get("status", "unknown")
sensor = topic.split("/")[1]
emoji = "🟢" if status == "online" else "🔴"
print(f"{emoji} [{sensor}] 状态: {status}")
elif topic.endswith("/temperature"):
temp = data.get("temperature")
sensor = data.get("sensor_id", "unknown")
seq = data.get("sequence", "?")
alert = " ⚠️ 高温告警!" if temp and temp > ALERT_THRESHOLD else ""
temp_str = f"{temp}°C" if temp else "N/A"
print(f"🌡️ [{sensor}] #{seq} | {temp_str}{alert}")
def main():
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=60)
try:
print("📊 温度监控终端启动 (Ctrl+C 退出)")
client.loop_forever()
except KeyboardInterrupt:
print("\n🛑 监控停止")
client.disconnect()
if __name__ == "__main__":
main()
三、运行演示
mosquitto -v
python3 mqtt_subscriber.py
python3 mqtt_publisher.py
mosquitto_sub -t "home/#" -v
预期输出(订阅者端)
📊 温度监控终端启动
🟢 [sensor_desktop] 状态: online
🌡️ [sensor_desktop] #1 | 25.43°C
🌡️ [sensor_desktop] #2 | 26.87°C
🌡️ [sensor_desktop] #3 | 29.12°C ⚠️ 高温告警!
按 Ctrl+C 停止发布者后:
🔴 [sensor_desktop] 状态: offline → reason: connection lost
关键点
- QoS 1:保证消息到达但可能重复(温度场景容忍重复)
- 遗嘱消息:无需心跳超时等待,Broker 立刻通知订阅者设备离线
- Retain:状态主题用 retain,新订阅者立即可得最新上线状态
- 通配符:
home/+/temperature 匹配所有房间的传感器