MQTT 协议

技术栈
其他
mqtt物联网消息协议pub-sub轻量级qos

概览

MQTT 协议技术栈概览

MQTT(Message Queuing Telemetry Transport)是 OASIS 标准化的轻量级发布/订阅消息传输协议,由 IBM 的 Andy Stanford-Clark 和 Arcom 的 Arlen Nipper 于 1999 年发明。专为低带宽、高延迟、不可靠网络设计,已成为 IoT 事实标准协议。

解决什么问题

  • 设备资源受限:最小报文仅 2 字节头部,可在 8 位 MCU 上运行
  • 网络不稳定:支持断线自动重连、遗嘱消息(LWT)、持久会话
  • 大规模连接:单 Broker 可支撑百万级并发连接(如 EMQX)
  • 一对多通信:发布/订阅模式解耦发送方与接收方

关键特性

  • QoS 三级:0 至多一次、1 至少一次、2 恰好一次
  • 遗嘱/保留消息:客户端异常断线自动发布遗言,保留消息新订阅者立即可得
  • 主题通配符+ 单层通配、# 多层通配,灵活匹配
  • 5.0 新特性:会话过期、共享订阅、请求/响应模式、用户属性
  • 多传输层:TCP、WebSocket、QUIC 均可承载 MQTT

安装

MQTT 协议环境搭建指南

1. 环境准备

MQTT 是协议规范,不绑定特定实现。以下是常用开发环境:

用途 推荐工具
Broker Mosquitto / EMQX / HiveMQ / VerneMQ
客户端 CLI mosquitto_pub / mosquitto_sub
Python 库 paho-mqtt
嵌入式 C 库 paho.mqtt.embedded-c / wolfMQTT
图形化工具 MQTTX / MQTT Explorer
云端 Broker AWS IoT Core / Azure IoT Hub / EMQX Cloud

基础依赖:网络可达的 Broker 地址(本地或云端均可)。

2. 安装步骤

本地 Broker:Mosquitto

# Ubuntu / Debian
sudo apt install mosquitto mosquitto-clients

# macOS
brew install mosquitto

# Windows
# 下载:https://mosquitto.org/download/
# 安装后默认作为服务运行

# 启动 Broker(Linux/macOS)
mosquitto -v -p 1883

# 测试
mosquitto_sub -t "test/topic" -v &;
mosquitto_pub -t "test/topic" -m "Hello MQTT"

Python 客户端

pip install paho-mqtt

嵌入式 C 客户端

git clone https://github.com/eclipse-paho/paho.mqtt.embedded-c.git
# 复制 MQTTPacket/src/* 到项目中
# 在 mqtt_transport.c 中实现网络层收发

图形化工具 MQTTX

# 下载桌面客户端:https://mqttx.app/
# 或通过 Homebrew (macOS)
brew install --cask mqttx

3. 常见安装问题

问题 解决方案
Mosquitto 连接拒绝 检查防火墙:sudo ufw allow 1883;查看配置 /etc/mosquitto/mosquitto.confallow_anonymous true
端口 1883 被占用 lsof -i :1883 查找占用进程;改用 8883(TLS)或 8080(WebSocket)
TLS 证书错误 测试阶段可在 mosquitto_pub -p 8883 --insecure 跳过验证
paho-mqtt 连接超时 检查 Broker IP/端口是否正确;防火墙;网络代理
QoS 2 消息堆积 Broker 重启后会重新投递未完成的消息,检查客户端是否正确 ACK

示例

MQTT 协议进阶:QoS 深度解析与安全通信(TLS)

目标

深入理解 MQTT QoS 机制的消息流程,配置 TLS/SSL 加密通信,使用 MQTT 5.0 新特性。

一、QoS 深度解析

QoS 0 — 至多一次 (Fire and Forget)

发布者 →  PUBLISH        → Broker → PUBLISH → 订阅者
       (不等待确认)              (不重试)
client.publish("sensor/temp", "25.5", qos=0)
# 优点:最快、开销最小
# 适用:高频传感器数据,容忍丢帧(如每秒 100 次的加速度采样)

QoS 1 — 至少一次 (Acknowledged)

发布者 →  PUBLISH (Packet ID=1234)
        ←  PUBACK (Packet ID=1234)
Broker →  PUBLISH (Packet ID=5678)
        ←  PUBACK (Packet ID=5678)    订阅者
# 未收到 PUBACK 则重发
def on_publish(client, userdata, mid):
    print(f"QoS 1 消息 {mid} 已确认")

client.on_publish = on_publish
client.publish("alert/fire", "HIGH", qos=1)
# 优点:可靠投递
# 缺点:可能重复(订阅者需幂等处理)

QoS 2 — 恰好一次 (Four-Way Handshake)

发布者 →  PUBLISH (ID=1)
        ←  PUBREC (ID=1)
发布者 →  PUBREL (ID=1)
        ←  PUBCOMP (ID=1)
# 四步握手保证「恰好一次」
client.publish("payment/confirmed", json.dumps(order), qos=2)
# 优点:不丢不重
# 缺点:最慢,适合金融/控制指令
# 注意:MQTT 5.0 优化了 QoS 2 握手流程

QoS 选择建议

场景 推荐 QoS 原因
温度传感器(1Hz) QoS 1 偶尔重复可接受
加速度计(100Hz) QoS 0 速度优先
支付确认 QoS 2 零容忍重复
设备控制命令 QoS 2 必须恰好执行一次
状态更新 (Retained) QoS 1 新订阅者需最新状态

二、TLS/SSL 安全通信

生成自签名证书

# CA 私钥
openssl genrsa -out ca.key 2048

# CA 证书
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
    -subj "/C=CN/ST=Guangdong/O=IoT Company/CN=My IoT CA"

# Broker 私钥
openssl genrsa -out broker.key 2048

# Broker 证书请求
openssl req -new -key broker.key -out broker.csr \
    -subj "/CN=mqtt.local"

# 签发 Broker 证书
openssl x509 -req -in broker.csr -CA ca.crt -CAkey ca.key \
    -CAcreateserial -out broker.crt -days 365

Mosquitto TLS 配置

# /etc/mosquitto/conf.d/tls.conf
listener 8883
cafile   /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/broker.crt
keyfile  /etc/mosquitto/certs/broker.key

require_certificate true       # 要求客户端证书
tls_version tlsv1.2

# 同时保留内部非 TLS 端口
listener 1883 localhost        # 仅本地可用

Python 客户端 TLS

import ssl
import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="secure_sensor")

# 单向 TLS(验证 Broker)
client.tls_set(
    ca_certs="ca.crt",           # CA 证书
    certfile=None,               # 无客户端证书
    keyfile=None,
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# 双向 TLS(客户端也出示证书)
# client.tls_set(
#     ca_certs="ca.crt",
#     certfile="client.crt",
#     keyfile="client.key",
#     cert_reqs=ssl.CERT_REQUIRED
# )

client.connect("mqtt.local", 8883)

三、MQTT 5.0 新特性实战

# MQTT 5.0 特性示例
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

client = mqtt.Client(protocol=mqtt.MQTTv5)

# ── 会话过期 ──
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600  # 断线后保留会话 1 小时

client.connect("localhost", properties=props)

# ── 消息过期 ──
pub_props = Properties(PacketTypes.PUBLISH)
pub_props.MessageExpiryInterval = 60  # 消息 60 秒后自动丢弃

client.publish("temp/urgent", "42°C", qos=1, properties=pub_props)

# ── 请求/响应模式 ──
# 发布者
response_props = Properties(PacketTypes.PUBLISH)
response_props.ResponseTopic = "cmd/response/xyz"
response_props.CorrelationData = b"req-001"

client.publish("cmd/restart", properties=response_props)

# ── 用户属性(自定义元数据) ──
meta_props = Properties(PacketTypes.PUBLISH)
meta_props.UserProperty = [
    ("sensor_type", "temperature"),
    ("firmware_version", "2.4.1"),
    ("priority", "high")
]
client.publish("data", "25.0", properties=meta_props)

四、关键点

  • QoS 降级:发布者 QoS 2 → Broker 可为订阅者降为 QoS 1 或 0(取决于订阅 QoS)
  • TLS 开销:ESP32 上 TLS 握手约 1-2 秒,需考虑电池功耗
  • MQTT 5.0 vs 3.1.1:5.0 新增 30+ 特性但 Broker 兼容成本高,嵌入式首选 3.1.1
  • 会话保持clean_start=false + SessionExpiryInterval 确保断线后消息不丢失

MQTT Python 客户端:温度传感器发布与订阅

目标

使用 paho-mqtt 库实现一个模拟温度传感器的 Publisher 和监控终端的 Subscriber,理解 MQTT 发布/订阅模式、QoS 和遗嘱消息。

前提

# 启动本地 Broker
mosquitto -v -p 1883 &;

# 安装 Python 库
pip install paho-mqtt

一、温度发布者(Sensor)

#!/usr/bin/env python3
"""mqtt_publisher.py — 模拟温度传感器,定期发布数据"""

import paho.mqtt.client as mqtt
import json
import time
import random
import platform

BROKER = "localhost"
PORT = 1883
TOPIC_TEMP = "home/sensor/temperature"
TOPIC_STATUS = "home/sensor/status"
CLIENT_ID = f"sensor_{platform.node()}"

# 遗嘱消息:传感器异常断线时 Broker 自动发布
WILL_TOPIC = "home/sensor/status"
WILL_MESSAGE = json.dumps({"status": "offline", "reason": "connection lost"})

def on_connect(client, userdata, flags, rc):
    """连接回调"""
    status_codes = {
        0: "连接成功",
        1: "协议版本错误",
        2: "Client ID 被拒",
        3: "服务器不可用",
        4: "用户名/密码错误",
        5: "未授权"
    }
    print(f"[MQTT] {status_codes.get(rc, f'未知错误 {rc}')}")
    # 发布上线状态
    client.publish(TOPIC_STATUS, json.dumps({"status": "online"}), qos=1, retain=True)

def on_publish(client, userdata, mid):
    """发布完成回调(QoS 1/2 时有效)"""
    print(f"  ✅ 消息已确认 (mid={mid})")

def simulate_temperature():
    """模拟温度传感器读数"""
    base = 25.0
    drift = 5 * random.random() - 2.5
    noise = random.gauss(0, 0.3)
    return round(base + drift + noise, 2)

def main():
    client = mqtt.Client(
        client_id=CLIENT_ID,
        protocol=mqtt.MQTTv5  # MQTT 5.0
    )

    # 设置遗嘱(MQTT 5)
    client.will_set(WILL_TOPIC, WILL_MESSAGE, qos=1, retain=True)

    # 绑定回调
    client.on_connect = on_connect
    client.on_publish = on_publish

    # 连接 Broker
    client.connect(BROKER, PORT, keepalive=60)
    client.loop_start()  # 后台网络线程

    try:
        count = 0
        while True:
            count += 1
            temp = simulate_temperature()
            payload = json.dumps({
                "sensor_id": CLIENT_ID,
                "temperature": temp,
                "unit": "celsius",
                "timestamp": int(time.time()),
                "sequence": count
            })

            # 发布温度(QoS 1:至少一次)
            info = client.publish(TOPIC_TEMP, payload, qos=1)

            print(f"📡 发布 #{count}: {temp}°C | QoS=1 | mid={info.mid}")
            time.sleep(2)

    except KeyboardInterrupt:
        print("\n🛑 传感器停止")
        # 优雅下线
        client.publish(TOPIC_STATUS, json.dumps({"status": "offline", "shutdown": True}),
                       qos=1, retain=True)
        client.loop_stop()
        client.disconnect()

if __name__ == "__main__":
    main()

二、温度订阅者(Monitor)

#!/usr/bin/env python3
"""mqtt_subscriber.py — 温度监控终端"""

import paho.mqtt.client as mqtt
import json

BROKER = "localhost"
PORT = 1883
# 使用通配符订阅所有传感器
TOPIC_TEMP = "home/+/temperature"     # + 单层通配符
TOPIC_STATUS = "home/+/status"

ALERT_THRESHOLD = 28.0  # 温度告警阈值

def on_connect(client, userdata, flags, rc):
    print(f"[MQTT] 已连接到 Broker (rc={rc})")
    client.subscribe([(TOPIC_TEMP, 1), (TOPIC_STATUS, 1)])
    print(f"[MQTT] 订阅: {TOPIC_TEMP}, {TOPIC_STATUS}")

def on_message(client, userdata, msg):
    """消息回调"""
    try:
        data = json.loads(msg.payload)
    except json.JSONDecodeError:
        print(f"⚠️ 无法解析: {msg.topic} → {msg.payload}")
        return

    topic = msg.topic

    if topic.endswith("/status"):
        status = data.get("status", "unknown")
        sensor = topic.split("/")[1]
        emoji = "🟢" if status == "online" else "🔴"
        print(f"{emoji} [{sensor}] 状态: {status}")

    elif topic.endswith("/temperature"):
        temp = data.get("temperature")
        sensor = data.get("sensor_id", "unknown")
        seq = data.get("sequence", "?")

        alert = " ⚠️ 高温告警!" if temp and temp > ALERT_THRESHOLD else ""
        temp_str = f"{temp}°C" if temp else "N/A"
        print(f"🌡️  [{sensor}] #{seq} | {temp_str}{alert}")

def main():
    client = mqtt.Client(protocol=mqtt.MQTTv5)
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(BROKER, PORT, keepalive=60)

    try:
        print("📊 温度监控终端启动 (Ctrl+C 退出)")
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n🛑 监控停止")
        client.disconnect()

if __name__ == "__main__":
    main()

三、运行演示

# 终端 1:启动 Broker
mosquitto -v

# 终端 2:启动订阅者
python3 mqtt_subscriber.py

# 终端 3:启动发布者
python3 mqtt_publisher.py

# 终端 4(可选):CLI 工具查看
mosquitto_sub -t "home/#" -v

预期输出(订阅者端)

📊 温度监控终端启动
🟢 [sensor_desktop] 状态: online
🌡️ [sensor_desktop] #1 | 25.43°C
🌡️ [sensor_desktop] #2 | 26.87°C
🌡️ [sensor_desktop] #3 | 29.12°C ⚠️ 高温告警!

按 Ctrl+C 停止发布者后:

🔴 [sensor_desktop] 状态: offline → reason: connection lost

关键点

  • QoS 1:保证消息到达但可能重复(温度场景容忍重复)
  • 遗嘱消息:无需心跳超时等待,Broker 立刻通知订阅者设备离线
  • Retain:状态主题用 retain,新订阅者立即可得最新上线状态
  • 通配符home/+/temperature 匹配所有房间的传感器

教程

MQTT 协议深度应用指南

本章目标

从协议原理到生产级部署,全面掌握 MQTT 在物联网系统中的关键设计模式。


1. MQTT 协议核心设计

为什么不用 HTTP?

对比 HTTP MQTT
消息模型 请求/响应 发布/订阅
发起方 仅客户端 双向推送
开销 Header ~800B Header 2B (min)
连接保持 短连接 长连接
适合设备 服务器 传感器/嵌入式

2. 主题设计最佳实践

推荐层级结构

home/{room}/{device}/{metric}
home/living-room/temperature/state
home/living-room/light/set
home/living-room/light/state

factory/line-1/motor-3/speed
factory/line-1/motor-3/status

device/{device_id}/telemetry
device/{device_id}/command
device/{device_id}/response

设计原则

  1. 在主题里放动态值:sensor/25.5/temp ❌ → sensor/temperature ✅(值放在 payload)
  2. 使用单数/复数区分:device/ 放元数据,devices/ 放列表
  3. 命令用 /set,状态用 /state

3. Broker 选型

Broker 特点 适合
Mosquitto 轻量 C 实现,单机百万连接 边缘/小规模
EMQX Erlang,分布式集群 云原生/大规模
VerneMQ Erlang,Apache 2.0 自建集群
HiveMQ Java,企业级 商业项目
AWS IoT Core 托管,与 AWS 生态集成 AWS 用户

4. 生产部署关键问题

QoS 流控

QoS 1 消息堆积 → Broker 内存暴涨
解决:
1. 限制飞行窗口 (max_inflight_messages)
2. 设置消息过期 (Message Expiry)
3. 监控队列深度 → 告警

遗嘱消息 (LWT) 正确使用

# 连接时设置遗嘱
client.will_set(
    "device/status",
    json.dumps({"status": "offline", "ts": int(time.time())}),
    qos=1,
    retain=True
)

# 上线后清除遗嘱状态
client.publish("device/status",
    json.dumps({"status": "online"}),
    qos=1, retain=True
)

5. 安全架构

设备 (证书) ──TLS──> Broker ──> 应用 (JWT 鉴权)
                                ↑
                         ACL (按主题授权)
  • 设备证书:工厂预烧 X.509
  • ACLdevice/{id}/# 只允许该设备访问
  • JWT:后端服务用短期 Token

6. 大规模架构

设备 → MQTT Broker (EMQX 集群) → Kafka → Flink/Spark → DB
                                 ↓
                          规则引擎(告警/动作)

思考题

  1. MQTT 5.0 的共享订阅解决了什么问题?
  2. 如何设计主题来支持 OTA 固件升级?
  3. Broker 集群如何保证跨节点的消息顺序?

参考资料

  1. [1] OASIS. MQTT Version 5.0 OASIS Standard. 2019. https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
  2. [2] OASIS. MQTT Version 3.1.1 OASIS Standard. 2014. https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
  3. [3] HiveMQ. MQTT Essentials. 2024. https://www.hivemq.com/mqtt-essentials/
  4. [4] Dominik Obermaier, Jens Deters. Building IoT Applications with MQTT. 2023.
  5. [5] Eclipse Foundation. Eclipse Paho MQTT Python Client. 2024. https://pypi.org/project/paho-mqtt/