InfluxDB

技术栈
数据库
nosqltime-seriesmonitoringiotanalyticsmetrics

概览

InfluxDB 是专为时序数据设计的 NoSQL 数据库,擅长存储和查询带时间戳的测量数据(如传感器读数、服务器指标、金融行情)。使用类 SQL 的 Flux 或 InfluxQL 查询语言,支持自动数据过期(Retention Policy)、持续查询(Continuous Query)和下采样。是 TICK Stack(Telegraf + InfluxDB + Chronograf + Kapacitor)的核心,广泛用于 DevOps 监控和物联网场景。

安装

InfluxDB 安装指南

1. 环境准备

要求 说明
操作系统 Linux、macOS、Windows(Docker 推荐)
内存 最低 512 MB,推荐 2 GB+
磁盘 SSD 推荐(时序数据写密集)
端口 8086(HTTP API + UI)
版本 InfluxDB 2.x(最新) 或 1.8(经典稳定版)

2. 安装命令

Docker(推荐)

# InfluxDB 2.x(含 Web UI)
docker run -d --name influxdb2 \
  -p 8086:8086 \
  -e DOCKER_INFLUXDB_INIT_MODE=setup \
  -e DOCKER_INFLUXDB_INIT_USERNAME=admin \
  -e DOCKER_INFLUXDB_INIT_PASSWORD=password123 \
  -e DOCKER_INFLUXDB_INIT_ORG=my-org \
  -e DOCKER_INFLUXDB_INIT_BUCKET=my-bucket \
  -e DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-token \
  -v influxdb2_data:/var/lib/influxdb2 \
  influxdb:latest

# 访问 http://localhost:8086(自带 Data Explorer + Dashboard)

Ubuntu/Debian

# InfluxDB 2.x
wget -q https://repos.influxdata.com/influxdata-archive_compat.key
echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c &;& cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list

sudo apt update
sudo apt install -y influxdb2

# 启动
sudo systemctl start influxdb
sudo systemctl enable influxdb

# 首次设置
influx setup \
  --username admin \
  --password password123 \
  --org my-org \
  --bucket my-bucket \
  --token my-super-secret-token \
  --force

macOS

brew install influxdb-cli influxdb
brew services start influxdb

# 或直接运行 influxd
influxd

TICK Stack 一键(Telegraf + InfluxDB + Chronograf + Kapacitor)

# 这些组件可分别安装:
brew install telegraf          # 数据采集代理
brew install chronograf        # 可视化管理界面(InfluxDB 1.x)
# InfluxDB 2.x 自带 Web UI,无需单独 Chronograf

3. 常见安装问题

Q1: 端口 8086 被占用

# 修改配置
sudo vim /etc/influxdb/config.toml
# 找到 http-bind-address = ":8086",改为其他端口

Q2: 首次 setup 失败 "instance already initialized"

InfluxDB 2.x 只能 setup 一次,如需重置:

# 删除 boltdb 存储
sudo rm -r /var/lib/influxdb2/
# 重新启动
sudo systemctl restart influxdb
# 再次执行 influx setup

Q3: InfluxDB 1.x vs 2.x 选择

版本 InfluxQL Flux 推荐场景
1.8 ✅ 原生 需插件 已有 1.x 生态(Chronograf)
2.x 可兼容 ✅ 原生 新项目,Web UI 更完善

Q4: Docker 中 token 管理

Token 用于 API 认证,务必保存:

# 查看已有 token
docker exec -it influxdb2 influx auth list

# 数据写入需带 token
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
  -H "Authorization: Token my-super-secret-token" \
  -d "temperature,location=lab value=23.5"

示例

InfluxDB Hello World:传感器数据写入与查询

目标

使用 InfluxDB 2.x 存储温度传感器数据,用 Flux 语言查询和聚合。

完整代码

命令行写入

# 准备 token 和环境变量
export INFLUX_TOKEN="my-super-secret-token"
export INFLUX_ORG="my-org"
export INFLUX_BUCKET="my-bucket"

# 写入单条数据(行协议格式)
curl -X POST "http://localhost:8086/api/v2/write?org=$INFLUX_ORG&bucket=$INFLUX_BUCKET&precision=s" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -d "temperature,location=lab value=23.5 $(date +%s)"

# 批量写入
curl -X POST "http://localhost:8086/api/v2/write?org=$INFLUX_ORG&bucket=$INFLUX_BUCKET&precision=s" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -d "temperature,location=lab value=23.5 1696128000
temperature,location=lab value=23.8 1696128060
temperature,location=lab value=24.1 1696128120
temperature,location=outdoor value=18.2 1696128000
temperature,location=outdoor value=18.5 1696128060
humidity,location=lab value=55.0 1696128000
humidity,location=lab value=54.2 1696128060"

Flux 查询

// 在 InfluxDB Web UI 的 Data Explorer 中执行

// 1. 查询最近 1 小时的数据
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "lab")

// 2. 聚合:每 30 秒的平均温度
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 30s, fn: mean)

// 3. 比较多个地点
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
  |> aggregateWindow(every: 1m, fn: mean)

// 4. 检测异常(偏离均值 2 标准差以上)
from(bucket: "my-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> mean(column: "_value")
  |> map(fn: (r) => ({r with threshold: r._value + 5.0}))

Python 版

# pip install influxdb-client
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import time, random

# 配置
url = "http://localhost:8086"
token = "my-super-secret-token"
org = "my-org"
bucket = "my-bucket"

client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

# 写入数据
for i in range(20):
    point = (
        Point("temperature")
        .tag("location", "lab")
        .field("value", round(random.uniform(22.0, 26.0), 2))
        .time(time.time_ns())
    )
    write_api.write(bucket=bucket, org=org, record=point)
    time.sleep(1)

print("写入 20 条数据完成")

# 查询数据(Flux 查询)
query_api = client.query_api()

query = f'''
from(bucket: "{bucket}")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "lab")
  |> aggregateWindow(every: 10s, fn: mean)
'''

tables = query_api.query(query, org=org)

print("\n查询结果:")
for table in tables:
    for record in table.records:
        print(f"  时间: {record.get_time()}, 温度: {record.get_value():.1f}°C")

client.close()

Node.js 版

// npm install @influxdata/influxdb-client
const { InfluxDB, Point } = require('@influxdata/influxdb-client');

const token = 'my-super-secret-token';
const org = 'my-org';
const bucket = 'my-bucket';

const client = new InfluxDB({ url: 'http://localhost:8086', token });
const writeApi = client.getWriteApi(org, bucket, 's');

// 写入
const point = new Point('temperature')
    .tag('location', 'lab')
    .floatField('value', 23.5);

writeApi.writePoint(point);
writeApi.flush().then(() => console.log('写入完成'));

// 查询
const queryApi = client.getQueryApi(org);
const fluxQuery = `
  from(bucket: "${bucket}")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "temperature")
`;

queryApi.queryRows(fluxQuery, {
    next(row, tableMeta) {
        const field = tableMeta.toObject(row);
        console.log(`时间: ${row[0]}, 温度: ${field.value}`);
    },
    error(error) { console.error(error); },
    complete() { console.log('查询完成'); client.close(); }
});

预期输出

写入 20 条数据完成

查询结果:
  时间: 2024-09-01T10:00:01Z, 温度: 23.5°C
  时间: 2024-09-01T10:00:02Z, 温度: 24.1°C
  时间: 2024-09-01T10:00:03Z, 温度: 23.8°C
  ...

关键点

  • 行协议格式:measurement,tag=value field=value timestamp(空格分隔三部分)
  • measurement = 表名,tag = 索引维度(字符串),field = 实际值(数值)
  • aggregateWindow 是时序聚合的核心:按时间窗口做 mean/max/min/sum
  • Flux 管道风格:source |> filter() |> aggregate() |> yield()
  • InfluxDB 2.x 自带 Web UI(Port 8086),可直接可视化

教程

InfluxDB 从零到实战:IoT 监控平台

1. 背景与概念

1.1 时序数据特征

时序数据 = 时间戳 + 标签(维度)+ 测量值:

温度读数:  timestamp=10:00, device=ESP32-001, location=lab, value=23.5
CPU 指标:  timestamp=10:01, host=server01, cpu_usage=67.3%

特点是写多读少、时间有序、不可变(过去数据不会更新)。

1.2 行协议拆解

temperature,location=lab,device=esp32 value=23.5,humidity=55 1696128000
└─┬──────┘ └───────┬──────────────┘ └──────────┬─────────┘ └────┬──┘
 measurement     tag set                    field set        timestamp
部分 必填 说明
measurement 类表名
tag set 索引维度,字符串类型
field set 测量值,支持 float/int/bool/string
timestamp 纳秒精度 Unix 时间戳(默认服务器时间)

2. 分步实战:构建 IoT 环境监控系统

场景

大学实验室部署多个传感器节点,采集温湿度。需要实时仪表盘、历史趋势和告警。

步骤一:架构设计

ESP32 传感器 ──MQTT──▶ Telegraf ──写入──▶ InfluxDB ──查询──▶ Grafana
                                          │
                                          └──▶ Kapacitor (告警)

步骤二:模拟传感器数据

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import time, random, uuid

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

sensors = {
    'lab-401': {'location': '四楼实验室', 'floor': '4'},
    'lab-502': {'location': '五楼实验室', 'floor': '5'},
    'server-room': {'location': '服务器机房', 'floor': '1'}
}

while True:
    for sensor_id, tags in sensors.items():
        # 写入温度
        temp_point = (
            Point("environment")
            .tag("sensor_id", sensor_id)
            .tag("location", tags['location'])
            .field("temperature", round(random.uniform(18, 28), 2))
            .field("humidity", round(random.uniform(30, 70), 2))
        )
        write_api.write(bucket="iot", org="my-org", record=temp_point)
    time.sleep(5)

步骤三:核心查询

// 1. 实时仪表盘:最近 5 分钟平均温度
from(bucket: "iot")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> group(columns: ["sensor_id"])
  |> aggregateWindow(every: 10s, fn: mean)

// 2. 过去 24 小时趋势(按小时聚合)
from(bucket: "iot")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> aggregateWindow(every: 1h, fn: mean)
  |> group(columns: ["sensor_id"])

// 3. 热力图:哪个传感器平均温度最高
from(bucket: "iot")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> group(columns: ["sensor_id"])
  |> mean()
  |> sort(columns: ["_value"], desc: true)
  |> limit(n: 5)

// 4. 异常检测(温度超过阈值)
from(bucket: "iot")
  |> range(start: -10m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> filter(fn: (r) => r._value > 30.0)   // 超过 30 度
  |> keep(columns: ["_time", "sensor_id", "_value"])

// 5. 计算 delta(变化速率)
from(bucket: "iot")
  |> range(start: -30m)
  |> filter(fn: (r) => r._measurement == "environment")
  |> filter(fn: (r) => r._field == "temperature")
  |> derivative(unit: 1m)

步骤四:设置 Grafana 可视化

# 1. 启动 Grafana
docker run -d -p 3000:3000 --name grafana grafana/grafana

# 2. 访问 http://localhost:3000(默认 admin/admin)
# 3. Add data source → InfluxDB
#    - Query Language: Flux
#    - URL: http://influxdb:8086
#    - Organization: my-org
#    - Token: my-token
# 4. 创建 Dashboard → Add Panel → 粘贴 Flux 查询

步骤五:Downsampling 下采样

// 创建下采样任务:每小时聚合一次,保留到长期存储桶
option task = {
    name: "hourly_downsample",
    every: 1h,
}

from(bucket: "iot")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "environment")
  |> aggregateWindow(every: 1h, fn: mean)
  |> to(bucket: "iot_downsampled", org: "my-org")

3. 思考题

  1. 1000 个传感器每秒上报一次数据,如何设计 Retention Policy 避免磁盘爆炸?
  2. Flux 的 aggregateWindowwindow 有什么区别?何时用哪个?
  3. InfluxDB 2.x 使用 _field_value 等特殊列。如果一条记录有多个 field(温度+湿度),它们如何存储和查询?

参考资料

  1. [1] InfluxData Inc.. InfluxDB 2.x 官方文档. 2024.
  2. [2] Paul Dix. Time Series Databases: InfluxDB 深度指南. 2021.
  3. [3] Perry Lea. IoT and Edge Computing for Architects (Ch 7: Time Series Data). 2020.