Logstash

技术栈
工具链
日志ELK数据管道ETL日志收集

概览

Logstash

Logstash 是 Elastic Stack(ELK)中的服务器端数据处理管道,能够从多种来源采集数据、转换数据,并将其发送到指定的存储后端。

核心价值:作为数据管道的"瑞士军刀",支持 200+ 插件(Input / Filter / Output),处理半结构化日志的理想工具。

关键特性

  • Input 插件:File、Beats、Kafka、JDBC、HTTP、Syslog...
  • Filter 插件:Grok、Mutate、Date、GeoIP、JSON...
  • Output 插件:Elasticsearch、Kafka、S3、File、Email...
  • 持久化队列(Persistent Queue)防止数据丢失
  • 死信队列(Dead Letter Queue)处理异常数据

安装

1. 环境准备

  • 操作系统:Linux(推荐)、macOS、Windows
  • Java:JDK 11 / 17 LTS(Logstash 依赖 JVM)
  • 端口:5044(Beats 输入)、9600(API)
  • 内存:生产环境建议 2GB+ JVM 堆
  • Elasticsearch:作为输出端需要提前运行

2. 安装命令

Docker(推荐快速体验)

docker run -d --name logstash \
  -p 5044:5044 -p 9600:9600 \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  docker.elastic.co/logstash/logstash:8.12.0

Ubuntu

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elastic-keyring.gpg
sudo apt install -y apt-transport-https
echo "deb [signed-by=/usr/share/keyrings/elastic-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt update
sudo apt install -y logstash

# 配置 JVM 堆大小
sudo tee -a /etc/logstash/jvm.options <;<EOF
-Xms1g
-Xmx1g
EOF

sudo systemctl enable --now logstash

macOS

brew tap elastic/tap
brew install elastic/tap/logstash-full

3. 配置结构

Logstash 配置文件由三部分组成:

input { ... }    # 数据来源
filter { ... }   # 数据处理(可选)
output { ... }   # 数据输出

最小配置示例(stdin → stdout)

input { stdin {} }
output { stdout { codec => rubydebug } }

4. 常见安装问题

问题 解决方案
Pipeline 启动缓慢 检查 Grok 模式是否正确,用 --config.test_and_exit 验证
Java heap space OOM 增大 jvm.options-Xmx,或优化 Filter
Elasticsearch 连接失败 检查 ES 地址、端口、证书(8.x 默认启用 SSL)
File Input 不读取 Logstash 记录 sincedb 文件,删除后重新读取
消息丢失 启用持久化队列:queue.type: persisted

示例

Logstash Nginx 日志解析管道

目标

使用 Logstash 解析 Nginx Access Log,提取结构化字段,写入 Elasticsearch。

完整配置

logstash-nginx.conf

input {
  # 方案A:读取文件
  file {
    path => "/var/log/nginx/access*.log"
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/nginx_sincedb"
    codec => plain
  }

  # 方案B:通过 Beats 接收(推荐生产用)
  beats {
    port => 5044
  }
}

filter {
  # 标准 Nginx 日志格式解析
  grok {
    match => {
      "message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) "(?:%{DATA:http_referer}|-)" "%{DATA:http_user_agent}"'
    }
  }

  # 时间字段转换
  date {
    match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
    target => "@timestamp"
  }

  # 地理位置解析(基于 IP)
  geoip {
    source => "remote_addr"
    target => "geoip"
  }

  # 用户代理解析
  useragent {
    source => "http_user_agent"
    target => "ua"
  }

  # 添加字段
  mutate {
    convert => {
      "body_bytes_sent" => "integer"
      "status" => "integer"
    }
    add_field => {
      "environment" => "production"
    }
    remove_field => ["message", "@version"]
  }
}

output {
  # 写入 Elasticsearch
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    index => "nginx-access-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "${ES_PASSWORD}"
    ssl => true
    ssl_certificate_verification => false
  }

  # 调试用:控制台输出(仅开发环境)
  stdout {
    codec => rubydebug
  }
}

运行步骤

# 1. 验证配置
logstash -f logstash-nginx.conf --config.test_and_exit

# 2. 运行
logstash -f logstash-nginx.conf

# 3. Docker 方式
docker run -d --name logstash-nginx \
  -v $(pwd)/logstash-nginx.conf:/usr/share/logstash/pipeline/logstash.conf:ro \
  -v /var/log/nginx:/var/log/nginx:ro \
  docker.elastic.co/logstash/logstash:8.12.0

预期输出

解析后的日志在 Elasticsearch 中可查询结构化字段(status、method、geoip.country_name 等),并在 Kibana 中可视化。

教程

Logstash 入门教程:Grok 解析与管道架构

1. Logstash 在 ELK 中的位置

Filebeat(采集) → Logstash(处理) → Elasticsearch(存储) → Kibana(可视化)
        ↑                                    ↑
    轻量级代理                         也可以直接写 ES

Filebeat 更轻量,Logstash 更强处理能力。生产推荐 Filebeat → Logstash → ES

2. Grok 模式详解

Grok 是 Logstash 的"正则表达式 DSL",把非结构化日志变成结构化 JSON。

内置模式

%{IP:client}           # IP 地址
%{TIMESTAMP_ISO8601:ts} # ISO 时间
%{NUMBER:duration}     # 数字
%{GREEDYDATA:message}  # 剩余所有内容

自定义 Grok 模式

# patterns/custom
MY_APP_LOG \[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{GREEDYDATA:message}

# 使用
grok {
  patterns_dir => ["./patterns"]
  match => { "message" => "%{MY_APP_LOG}" }
}

Grok 调试

访问 Kibana → Dev Tools → Grok Debugger 或使用 http://grokdebug.herokuapp.com/

3. 常用 Filter 插件

插件 功能 示例
grok 正则解析 %{COMBINEDAPACHELOG}
mutate 字段操作 rename / convert / add_field / remove
date 时间解析 match => ["ts", "ISO8601"]
geoip IP → 地理 source => "client_ip"
useragent UA 解析 浏览器/OS/设备
json JSON 解析 适合容器日志
csv CSV 解析 适合业务日志
dissect 分割符解析(比 Grok 快) %{ts} %{+ts} %{level} %{msg}

4. 持久化队列(Persistent Queue)

防止 Logstash 崩溃时丢数据:

# logstash.yml
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024

5. Pipeline-to-Pipeline 通信

# pipelines.yml
- pipeline.id: intake
  config.string: |
    input { beats { port =>; 5044 } }
    output { pipeline { send_to =>; [processing] } }

- pipeline.id: processing
  config.string: |
    input { pipeline { address =>; processing } }
    filter { ... }
    output { elasticsearch { ... } }

6. 思考题

  1. Grok 和 Dissect 如何选?什么时候用 Dissect?
  2. Logstash 和 Fluentd 各有什么优缺点?
  3. 如何监控 Logstash Pipeline 的处理延迟?

参考资料

暂无参考文献