Hello World - 传感器数据写入与查询

知识库
知识库文档
/tech-stacks/influxdb/examples/Hello World - 传感器数据写入与查询.md

文档

InfluxDB Hello World:传感器数据写入与查询

目标

使用 InfluxDB 2.x 存储温度传感器数据,用 Flux 语言查询和聚合。

完整代码

命令行写入

# 准备 token 和环境变量
export INFLUX_TOKEN="my-super-secret-token"
export INFLUX_ORG="my-org"
export INFLUX_BUCKET="my-bucket"

# 写入单条数据(行协议格式)
curl -X POST "http://localhost:8086/api/v2/write?org=$INFLUX_ORG&bucket=$INFLUX_BUCKET&precision=s" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -d "temperature,location=lab value=23.5 $(date +%s)"

# 批量写入
curl -X POST "http://localhost:8086/api/v2/write?org=$INFLUX_ORG&bucket=$INFLUX_BUCKET&precision=s" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -d "temperature,location=lab value=23.5 1696128000
temperature,location=lab value=23.8 1696128060
temperature,location=lab value=24.1 1696128120
temperature,location=outdoor value=18.2 1696128000
temperature,location=outdoor value=18.5 1696128060
humidity,location=lab value=55.0 1696128000
humidity,location=lab value=54.2 1696128060"

Flux 查询

// 在 InfluxDB Web UI 的 Data Explorer 中执行

// 1. 查询最近 1 小时的数据
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "lab")

// 2. 聚合:每 30 秒的平均温度
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 30s, fn: mean)

// 3. 比较多个地点
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
  |> aggregateWindow(every: 1m, fn: mean)

// 4. 检测异常(偏离均值 2 标准差以上)
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> mean(column: "_value")
  |> map(fn: (r) => ({r with threshold: r._value + 5.0}))

Python 版

# pip install influxdb-client
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import time, random

# 配置
url = "http://localhost:8086"
token = "my-super-secret-token"
org = "my-org"
bucket = "my-bucket"

client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

# 写入数据
for i in range(20):
    point = (
        Point("temperature")
        .tag("location", "lab")
        .field("value", round(random.uniform(22.0, 26.0), 2))
        .time(time.time_ns())
    )
    write_api.write(bucket=bucket, org=org, record=point)
    time.sleep(1)

print("写入 20 条数据完成")

# 查询数据(Flux 查询)
query_api = client.query_api()

query = f'''
from(bucket: "{bucket}")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "lab")
  |> aggregateWindow(every: 10s, fn: mean)
'''

tables = query_api.query(query, org=org)

print("\n查询结果:")
for table in tables:
    for record in table.records:
        print(f"  时间: {record.get_time()}, 温度: {record.get_value():.1f}°C")

client.close()

Node.js 版

// npm install @influxdata/influxdb-client
const { InfluxDB, Point } = require('@influxdata/influxdb-client');

const token = 'my-super-secret-token';
const org = 'my-org';
const bucket = 'my-bucket';

const client = new InfluxDB({ url: 'http://localhost:8086', token });
const writeApi = client.getWriteApi(org, bucket, 's');

// 写入
const point = new Point('temperature')
    .tag('location', 'lab')
    .floatField('value', 23.5);

writeApi.writePoint(point);
writeApi.flush().then(() => console.log('写入完成'));

// 查询
const queryApi = client.getQueryApi(org);
const fluxQuery = `
  from(bucket: "${bucket}")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "temperature")
`;

queryApi.queryRows(fluxQuery, {
    next(row, tableMeta) {
        const field = tableMeta.toObject(row);
        console.log(`时间: ${row[0]}, 温度: ${field.value}`);
    },
    error(error) { console.error(error); },
    complete() { console.log('查询完成'); client.close(); }
});

预期输出

写入 20 条数据完成

查询结果:
  时间: 2024-09-01T10:00:01Z, 温度: 23.5°C
  时间: 2024-09-01T10:00:02Z, 温度: 24.1°C
  时间: 2024-09-01T10:00:03Z, 温度: 23.8°C
  ...

关键点

  • 行协议格式:measurement,tag=value field=value timestamp(空格分隔三部分)
  • measurement = 表名,tag = 索引维度(字符串),field = 实际值(数值)
  • aggregateWindow 是时序聚合的核心:按时间窗口做 mean/max/min/sum
  • Flux 管道风格:source |> filter() |> aggregate() |> yield()
  • InfluxDB 2.x 自带 Web UI(Port 8086),可直接可视化

信息

路径
/tech-stacks/influxdb/examples/Hello World - 传感器数据写入与查询.md
更新时间
2026/5/31