Logstash 入门教程:Grok 解析与管道架构
1. Logstash 在 ELK 中的位置
Filebeat(采集) → Logstash(处理) → Elasticsearch(存储) → Kibana(可视化)
↑ ↑
轻量级代理 也可以直接写 ES
Filebeat 更轻量,Logstash 更强处理能力。生产推荐 Filebeat → Logstash → ES。
2. Grok 模式详解
Grok 是 Logstash 的"正则表达式 DSL",把非结构化日志变成结构化 JSON。
内置模式
%{IP:client} # IP 地址
%{TIMESTAMP_ISO8601:ts} # ISO 时间
%{NUMBER:duration} # 数字
%{GREEDYDATA:message} # 剩余所有内容
自定义 Grok 模式
# patterns/custom
MY_APP_LOG \[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{GREEDYDATA:message}
# 使用
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{MY_APP_LOG}" }
}
Grok 调试
访问 Kibana → Dev Tools → Grok Debugger 或使用 http://grokdebug.herokuapp.com/
3. 常用 Filter 插件
| 插件 |
功能 |
示例 |
| grok |
正则解析 |
%{COMBINEDAPACHELOG} |
| mutate |
字段操作 |
rename / convert / add_field / remove |
| date |
时间解析 |
match => ["ts", "ISO8601"] |
| geoip |
IP → 地理 |
source => "client_ip" |
| useragent |
UA 解析 |
浏览器/OS/设备 |
| json |
JSON 解析 |
适合容器日志 |
| csv |
CSV 解析 |
适合业务日志 |
| dissect |
分割符解析(比 Grok 快) |
%{ts} %{+ts} %{level} %{msg} |
4. 持久化队列(Persistent Queue)
防止 Logstash 崩溃时丢数据:
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024
5. Pipeline-to-Pipeline 通信
- pipeline.id: intake
config.string: |
input { beats { port =>
output { pipeline { send_to =>
- pipeline.id: processing
config.string: |
input { pipeline { address =>
filter { ... }
output { elasticsearch { ... } }
6. 思考题
- Grok 和 Dissect 如何选?什么时候用 Dissect?
- Logstash 和 Fluentd 各有什么优缺点?
- 如何监控 Logstash Pipeline 的处理延迟?