Python MQTT 客户端:温度传感器发布与订阅

知识库
知识库文档
/tech-stacks/mqtt-protocol/examples/Python MQTT 客户端:温度传感器发布与订阅.md

文档

MQTT Python 客户端:温度传感器发布与订阅

目标

使用 paho-mqtt 库实现一个模拟温度传感器的 Publisher 和监控终端的 Subscriber,理解 MQTT 发布/订阅模式、QoS 和遗嘱消息。

前提

# 启动本地 Broker
mosquitto -v -p 1883 &;

# 安装 Python 库
pip install paho-mqtt

一、温度发布者(Sensor)

#!/usr/bin/env python3
"""mqtt_publisher.py — 模拟温度传感器,定期发布数据"""

import paho.mqtt.client as mqtt
import json
import time
import random
import platform

BROKER = "localhost"
PORT = 1883
TOPIC_TEMP = "home/sensor/temperature"
TOPIC_STATUS = "home/sensor/status"
CLIENT_ID = f"sensor_{platform.node()}"

# 遗嘱消息:传感器异常断线时 Broker 自动发布
WILL_TOPIC = "home/sensor/status"
WILL_MESSAGE = json.dumps({"status": "offline", "reason": "connection lost"})

def on_connect(client, userdata, flags, rc):
    """连接回调"""
    status_codes = {
        0: "连接成功",
        1: "协议版本错误",
        2: "Client ID 被拒",
        3: "服务器不可用",
        4: "用户名/密码错误",
        5: "未授权"
    }
    print(f"[MQTT] {status_codes.get(rc, f'未知错误 {rc}')}")
    # 发布上线状态
    client.publish(TOPIC_STATUS, json.dumps({"status": "online"}), qos=1, retain=True)

def on_publish(client, userdata, mid):
    """发布完成回调(QoS 1/2 时有效)"""
    print(f"  ✅ 消息已确认 (mid={mid})")

def simulate_temperature():
    """模拟温度传感器读数"""
    base = 25.0
    drift = 5 * random.random() - 2.5
    noise = random.gauss(0, 0.3)
    return round(base + drift + noise, 2)

def main():
    client = mqtt.Client(
        client_id=CLIENT_ID,
        protocol=mqtt.MQTTv5  # MQTT 5.0
    )

    # 设置遗嘱(MQTT 5)
    client.will_set(WILL_TOPIC, WILL_MESSAGE, qos=1, retain=True)

    # 绑定回调
    client.on_connect = on_connect
    client.on_publish = on_publish

    # 连接 Broker
    client.connect(BROKER, PORT, keepalive=60)
    client.loop_start()  # 后台网络线程

    try:
        count = 0
        while True:
            count += 1
            temp = simulate_temperature()
            payload = json.dumps({
                "sensor_id": CLIENT_ID,
                "temperature": temp,
                "unit": "celsius",
                "timestamp": int(time.time()),
                "sequence": count
            })

            # 发布温度(QoS 1:至少一次)
            info = client.publish(TOPIC_TEMP, payload, qos=1)

            print(f"📡 发布 #{count}: {temp}°C | QoS=1 | mid={info.mid}")
            time.sleep(2)

    except KeyboardInterrupt:
        print("\n🛑 传感器停止")
        # 优雅下线
        client.publish(TOPIC_STATUS, json.dumps({"status": "offline", "shutdown": True}),
                       qos=1, retain=True)
        client.loop_stop()
        client.disconnect()

if __name__ == "__main__":
    main()

二、温度订阅者(Monitor)

#!/usr/bin/env python3
"""mqtt_subscriber.py — 温度监控终端"""

import paho.mqtt.client as mqtt
import json

BROKER = "localhost"
PORT = 1883
# 使用通配符订阅所有传感器
TOPIC_TEMP = "home/+/temperature"     # + 单层通配符
TOPIC_STATUS = "home/+/status"

ALERT_THRESHOLD = 28.0  # 温度告警阈值

def on_connect(client, userdata, flags, rc):
    print(f"[MQTT] 已连接到 Broker (rc={rc})")
    client.subscribe([(TOPIC_TEMP, 1), (TOPIC_STATUS, 1)])
    print(f"[MQTT] 订阅: {TOPIC_TEMP}, {TOPIC_STATUS}")

def on_message(client, userdata, msg):
    """消息回调"""
    try:
        data = json.loads(msg.payload)
    except json.JSONDecodeError:
        print(f"⚠️ 无法解析: {msg.topic} → {msg.payload}")
        return

    topic = msg.topic

    if topic.endswith("/status"):
        status = data.get("status", "unknown")
        sensor = topic.split("/")[1]
        emoji = "🟢" if status == "online" else "🔴"
        print(f"{emoji} [{sensor}] 状态: {status}")

    elif topic.endswith("/temperature"):
        temp = data.get("temperature")
        sensor = data.get("sensor_id", "unknown")
        seq = data.get("sequence", "?")

        alert = " ⚠️ 高温告警!" if temp and temp > ALERT_THRESHOLD else ""
        temp_str = f"{temp}°C" if temp else "N/A"
        print(f"🌡️  [{sensor}] #{seq} | {temp_str}{alert}")

def main():
    client = mqtt.Client(protocol=mqtt.MQTTv5)
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(BROKER, PORT, keepalive=60)

    try:
        print("📊 温度监控终端启动 (Ctrl+C 退出)")
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n🛑 监控停止")
        client.disconnect()

if __name__ == "__main__":
    main()

三、运行演示

# 终端 1:启动 Broker
mosquitto -v

# 终端 2:启动订阅者
python3 mqtt_subscriber.py

# 终端 3:启动发布者
python3 mqtt_publisher.py

# 终端 4(可选):CLI 工具查看
mosquitto_sub -t "home/#" -v

预期输出(订阅者端)

📊 温度监控终端启动
🟢 [sensor_desktop] 状态: online
🌡️ [sensor_desktop] #1 | 25.43°C
🌡️ [sensor_desktop] #2 | 26.87°C
🌡️ [sensor_desktop] #3 | 29.12°C ⚠️ 高温告警!

按 Ctrl+C 停止发布者后:

🔴 [sensor_desktop] 状态: offline → reason: connection lost

关键点

  • QoS 1:保证消息到达但可能重复(温度场景容忍重复)
  • 遗嘱消息:无需心跳超时等待,Broker 立刻通知订阅者设备离线
  • Retain:状态主题用 retain,新订阅者立即可得最新上线状态
  • 通配符home/+/temperature 匹配所有房间的传感器

信息

路径
/tech-stacks/mqtt-protocol/examples/Python MQTT 客户端:温度传感器发布与订阅.md
更新时间
2026/5/31