IoT 监控平台实战

知识库
知识库文档
/tech-stacks/influxdb/tutorial/IoT 监控平台实战.md

文档

InfluxDB 从零到实战:IoT 监控平台

1. 背景与概念

1.1 时序数据特征

时序数据 = 时间戳 + 标签(维度)+ 测量值:

温度读数:  timestamp=10:00, device=ESP32-001, location=lab, value=23.5
CPU 指标:  timestamp=10:01, host=server01, cpu_usage=67.3%

特点是写多读少、时间有序、不可变(过去数据不会更新)。

1.2 行协议拆解

temperature,location=lab,device=esp32 value=23.5,humidity=55 1696128000
└─┬──────┘ └───────┬──────────────┘ └──────────┬─────────┘ └────┬──┘
 measurement     tag set                    field set        timestamp
部分 必填 说明
measurement 类表名
tag set 索引维度,字符串类型
field set 测量值,支持 float/int/bool/string
timestamp 纳秒精度 Unix 时间戳(默认服务器时间)

2. 分步实战:构建 IoT 环境监控系统

场景

大学实验室部署多个传感器节点,采集温湿度。需要实时仪表盘、历史趋势和告警。

步骤一:架构设计

ESP32 传感器 ──MQTT──▶ Telegraf ──写入──▶ InfluxDB ──查询──▶ Grafana
                                          │
                                          └──▶ Kapacitor (告警)

步骤二:模拟传感器数据

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import time, random, uuid

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

sensors = {
    'lab-401': {'location': '四楼实验室', 'floor': '4'},
    'lab-502': {'location': '五楼实验室', 'floor': '5'},
    'server-room': {'location': '服务器机房', 'floor': '1'}
}

while True:
    for sensor_id, tags in sensors.items():
        # 写入温度
        temp_point = (
            Point("environment")
            .tag("sensor_id", sensor_id)
            .tag("location", tags['location'])
            .field("temperature", round(random.uniform(18, 28), 2))
            .field("humidity", round(random.uniform(30, 70), 2))
        )
        write_api.write(bucket="iot", org="my-org", record=temp_point)
    time.sleep(5)

步骤三:核心查询

// 1. 实时仪表盘:最近 5 分钟平均温度
from(bucket: "iot")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> group(columns: ["sensor_id"])
  |> aggregateWindow(every: 10s, fn: mean)

// 2. 过去 24 小时趋势(按小时聚合)
from(bucket: "iot")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> aggregateWindow(every: 1h, fn: mean)
  |> group(columns: ["sensor_id"])

// 3. 热力图:哪个传感器平均温度最高
from(bucket: "iot")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> group(columns: ["sensor_id"])
  |> mean()
  |> sort(columns: ["_value"], desc: true)
  |> limit(n: 5)

// 4. 异常检测(温度超过阈值)
from(bucket: "iot")
  |> range(start: -10m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> filter(fn: (r) => r._value > 30.0)   // 超过 30 度
  |> keep(columns: ["_time", "sensor_id", "_value"])

// 5. 计算 delta(变化速率)
from(bucket: "iot")
  |> range(start: -30m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> derivative(unit: 1m)

步骤四:设置 Grafana 可视化

# 1. 启动 Grafana
docker run -d -p 3000:3000 --name grafana grafana/grafana

# 2. 访问 http://localhost:3000(默认 admin/admin)
# 3. Add data source → InfluxDB
#    - Query Language: Flux
#    - URL: http://influxdb:8086
#    - Organization: my-org
#    - Token: my-token
# 4. 创建 Dashboard → Add Panel → 粘贴 Flux 查询

步骤五:Downsampling 下采样

// 创建下采样任务:每小时聚合一次,保留到长期存储桶
option task = {
    name: "hourly_downsample",
    every: 1h,
}

from(bucket: "iot")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> aggregateWindow(every: 1h, fn: mean)
  |> to(bucket: "iot_downsampled", org: "my-org")

3. 思考题

  1. 1000 个传感器每秒上报一次数据,如何设计 Retention Policy 避免磁盘爆炸?
  2. Flux 的 aggregateWindowwindow 有什么区别?何时用哪个?
  3. InfluxDB 2.x 使用 _field_value 等特殊列。如果一条记录有多个 field(温度+湿度),它们如何存储和查询?

信息

路径
/tech-stacks/influxdb/tutorial/IoT 监控平台实战.md
更新时间
2026/5/31