本文参考官方文档:https://www.elastic.co/guide/en/logstash/7.14/filter-plugins.html
介绍常用filter的用法,持续补充
不使用filter
input { stdin {} }
filter {}
output { stdout {} }
输入输出:
1
{
"message" => "1",
"host" => "test3",
"@timestamp" => 2022-03-25T08:02:33.415Z,
"@version" => "1"
}
grok正则匹配
match 匹配字段
普通match格式为%{PATTERN:字段名}
,默认grok支持的PATTERN可以通过在logstash目录下find找到:
find ./ | grep grok-patterns
./vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.1/patterns/ecs-v1/grok-patterns
./vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.1/patterns/legacy/grok-patterns
简单例子:
input { stdin {} }
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output { stdout {} }
输入输出:
55.3.244.1 GET /index.html 15824 0.043
{
"method" => "GET",
"@version" => "1",
"bytes" => "15824",
"@timestamp" => 2022-03-29T07:43:40.816Z,
"request" => "/index.html",
"client" => "55.3.244.1",
"host" => "test3",
"duration" => "0.043",
"message" => "55.3.244.1 GET /index.html 15824 0.043"
}
多重匹配:
input { stdin {} }
filter {
grok {
match => {
"message" => [
"Duration: %{NUMBER:duration}",
"Speed: %{NUMBER:speed}"
]
}
}
}
output { stdout {} }
Speed: 123
{
"host" => "test3",
"@timestamp" => 2022-03-29T08:21:05.944Z,
"message" => "Speed: 123",
"@version" => "1",
"speed" => "123"
}
Duration: 123
{
"host" => "test3",
"duration" => "123",
"@timestamp" => 2022-03-29T08:22:00.517Z,
"message" => "Duration: 123",
"@version" => "1"
}
123
{
"host" => "test3",
"@timestamp" => 2022-03-29T08:22:03.095Z,
"message" => "123",
"@version" => "1",
"tags" => [
[0] "_grokparsefailure"
]
}
break_on_match这个参数可以定义匹配多个PATTERN的时候,是否一匹配到就跳出,默认为true
,如果设置为false
则匹配完所有。
自定义pattern匹配
使用patterns_dir
指定自定义正则目录,编辑patterns/test
内容:
# 自定义正则名称 正则表达式
PHONE_NUMBER ^1[3-9]\d{9}$
input { stdin {} }
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{PHONE_NUMBER:phone}" }
}
}
output { stdout {} }
123
{
"message" => "123",
"@version" => "1",
"host" => "test3",
"@timestamp" => 2022-03-29T08:11:53.344Z,
"tags" => [
[0] "_grokparsefailure"
]
}
13100000000
{
"message" => "13100000000",
"@version" => "1",
"host" => "test3",
"@timestamp" => 2022-03-29T08:11:59.463Z,
"phone" => "13100000000"
}
keep_empty_captures 匹配保留空值
当正则表达式匹配的内容允许包含”空”的时候,比如:
ANY .*
需要把keep_empty_captures设置为true
才会保留“空”的值。
input { stdin {} }
filter {
grok {
keep_empty_captures => true
patterns_dir => ["./patterns"]
match => {
"message" => "test:%{ANY:test}"
}
}
}
output { stdout {} }
123
{
"message" => "123",
"@timestamp" => 2022-03-29T09:02:43.857Z,
"host" => "test3",
"@version" => "1",
"tags" => [
[0] "_grokparsefailure"
]
}
test:123
{
"message" => "test:123",
"test" => "123",
"@timestamp" => 2022-03-29T09:02:48.244Z,
"host" => "test3",
"@version" => "1"
}
test:
{
"message" => "test:",
"test" => "", # 保留
"@timestamp" => 2022-03-29T09:02:51.964Z,
"host" => "test3",
"@version" => "1"
}
target 匹配结果保存到目标
input { stdin {} }
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
target => info
}
}
output { stdout {} }
55.3.244.1 GET /index.html 15824 0.043
{
"message" => "55.3.244.1 GET /index.html 15824 0.043",
"@timestamp" => 2022-03-29T09:10:33.658Z,
"info" => {
"bytes" => "15824",
"request" => "/index.html",
"duration" => "0.043",
"method" => "GET",
"client" => "55.3.244.1"
},
"host" => "test3",
"@version" => "1"
}
overwrite 匹配结果复写原有字段
默认overwrite
为[]
,不会复写任何匹配的字段,只会追加(append)。
input {
stdin {
add_field => { "info" => "default" }
}
}
filter {
grok {
match => { "message" => "info: %{WORD:info}" }
overwrite => [ "info" ]
}
}
output { stdout {} }
输入输出:
123
{
"@timestamp" => 2022-03-29T09:17:59.760Z,
"message" => "123",
"info" => "default",
"host" => "test3",
"tags" => [
[0] "_grokparsefailure"
],
"@version" => "1"
}
info: new_info
{
"message" => "info: new_info",
"@timestamp" => 2022-03-29T09:18:08.750Z,
"info" => "new_info",
"host" => "test3",
"@version" => "1"
}
如果不设置overwrite
info: 123
{
"@version" => "1",
"@timestamp" => 2022-03-29T09:20:14.927Z,
"message" => "info: 123",
"info" => [
[0] "default",
[1] "123"
],
"host" => "test3"
}
mutate修改字段
mutate字段可以执行以下操作,比较复杂的有示例,普通的查下官网:
coerce 设置默认值
只有值为nil的才会被设置默认值,如果字段不存在,则不会添加默认值。
input { stdin {} }
filter {
ruby { code => 'event.set("test1", nil)' } # 设置一个nil值
mutate {
coerce => {
"test1" => "default value" # 存在test1字段,并且值为nil,会被设置默认值
"test2" => "default value" # 由于不存在test2字段,不会被设置默认值
}
}
}
output { stdout {} }
输入输出:
123
{
"message" => "123",
"@timestamp" => 2022-03-28T03:55:49.797Z,
"test1" => "default value",
"host" => "test3",
"@version" => "1"
}
rename 重命名字段
input { stdin {} }
filter {
mutate {
rename => { "message" => "new_message" }
}
}
output { stdout {} }
输入输出:
123
{
"new_message" => "123",
"@timestamp" => 2022-03-28T04:00:13.845Z,
"@version" => "1",
"host" => "test3"
}
update 更新字段
如果没有该字段,则无操作
input { stdin {} }
filter {
mutate {
update => { "message" => "456" }
}
}
output { stdout {} }
输入输出:
123
{
"message" => "456",
"@version" => "1",
"@timestamp" => 2022-03-28T04:11:18.504Z,
"host" => "test3"
}
replace 替换字段
可以使用%{foo}
来表示字段,如果没有该字段会添加字段
input { stdin {} }
filter {
mutate {
replace => { "message" => "%{host}: %{message}" }
}
}
output { stdout {} }
输入输出:
123
{
"@timestamp" => 2022-03-28T04:16:36.036Z,
"host" => "test3",
"@version" => "1",
"message" => "test3: 123"
}
456
{
"@timestamp" => 2022-03-28T04:16:39.136Z,
"host" => "test3",
"@version" => "1",
"message" => "test3: 456"
}
convert类型转换
input { stdin {} }
filter {
mutate {
convert => { "message" => float }
}
}
output { stdout {} }
输入输出:
123
{
"message" => 123.0,
"@version" => "1",
"host" => "test3",
"@timestamp" => 2022-03-28T04:22:27.110Z
}
gsub 替换内容
input { stdin {} }
filter {
mutate {
gsub => [
"message", "[-_/]", "."
]
}
}
output { stdout {} }
输入输出:
2022/01/01
{
"message" => "2022.01.01",
"@timestamp" => 2022-03-28T06:32:02.663Z,
"host" => "test3",
"@version" => "1"
}
2022-01-01
{
"message" => "2022.01.01",
"@timestamp" => 2022-03-28T06:32:08.917Z,
"host" => "test3",
"@version" => "1"
}
2022_01_01
{
"message" => "2022.01.01",
"@timestamp" => 2022-03-28T06:32:17.842Z,
"host" => "test3",
"@version" => "1"
}
uppercase 大写
input { stdin {} }
filter {
mutate {
uppercase => [ "message" ]
}
}
output { stdout {} }
输入输出:
abcd
{
"@timestamp" => 2022-03-28T06:34:18.980Z,
"message" => "ABCD",
"host" => "test3",
"@version" => "1"
}
capitalize 首字母大写
input { stdin {} }
filter {
mutate {
capitalize => [ "message" ]
}
}
output { stdout {} }
输入输出:
abcd
{
"@version" => "1",
"host" => "test3",
"message" => "Abcd",
"@timestamp" => 2022-03-28T06:35:43.118Z
}
lowercase 小写
input { stdin {} }
filter {
mutate {
lowercase => [ "message" ]
}
}
output { stdout {} }
输入输出:
ABCD
{
"@version" => "1",
"host" => "test3",
"@timestamp" => 2022-03-28T06:36:58.364Z,
"message" => "abcd"
}
strip 剔除空字符
input { stdin {} }
filter {
mutate {
strip => ["message"]
}
}
output { stdout {} }
输入输出:
123 # 开头空格
{
"host" => "test3",
"@version" => "1",
"message" => "123",
"@timestamp" => 2022-03-28T06:39:45.745Z
}
123 # 结尾空格
{
"host" => "test3",
"@version" => "1",
"message" => "123",
"@timestamp" => 2022-03-28T06:39:51.762Z
}
123 123 # 中间空格
{
"host" => "test3",
"@version" => "1",
"message" => "123 123",
"@timestamp" => 2022-03-28T06:40:42.050Z
}
remove 移除
remove_field 移除字段
remove_tag 移除tag
input { stdin {} }
filter {
mutate {
remove_field => ["message"]
remove_tag => ["json"]
}
}
output { stdout {} }
输入输出:
123
{
"@version" => "1",
"@timestamp" => 2022-03-28T06:45:27.784Z,
"host" => "test3"
}
split 拆分字段
input { stdin {} }
filter {
mutate {
split => { "message" => "," }
}
}
output { stdout {} }
输入输出:
1,2,3
{
"@timestamp" => 2022-03-28T06:49:44.231Z,
"message" => [
[0] "1",
[1] "2",
[2] "3"
],
"host" => "test3",
"@version" => "1"
}
join 连接字段
input {
stdin {
add_field => { "test_array" => ["1", "2", "3"]}
}
}
filter {
mutate {
join => { "test_array" => "," }
}
}
output { stdout {} }
输入输出:
123
{
"test_array" => "1,2,3",
"host" => "test3",
"message" => "123",
"@timestamp" => 2022-03-28T06:55:03.926Z,
"@version" => "1"
}
merge 合并字段
两个字符串合并会变成数组
input {
stdin {
add_field => { "test1" => "1111"}
add_field => { "test2" => "2222"}
}
}
filter {
mutate {
merge => { "test2" => "test1" }
}
}
output { stdout {} }
输入输出:
{
"test2" => [
[0] "2222",
[1] "1111"
],
"@version" => "1",
"test1" => "1111",
"@timestamp" => 2022-03-28T06:58:01.010Z,
"message" => "",
"host" => "test3"
}
copy 复制字段
input { stdin {} }
filter {
mutate {
copy => { "message" => "message_copy" }
}
}
output { stdout {} }
输入输出:
123
{
"message" => "123",
"@version" => "1",
"host" => "test3",
"@timestamp" => 2022-03-28T07:00:00.966Z,
"message_copy" => "123"
}
drop丢弃
percentage丢弃概率
丢弃50%数据:
input { stdin {} }
filter {
drop {
percentage => 50
}
}
output { stdout {} }
有50%的数据被丢弃,这是一个丢弃概率,这次丢弃,下次也可能是丢弃:
1 # 输入1,结果被丢弃
2 # 输入2,结果被丢弃
3 # 输入3,有结果返回
{
"host" => "test3",
"@timestamp" => 2022-03-25T08:21:23.656Z,
"message" => "3",
"@version" => "1"
}
4 # 输入4,有结果返回
{
"host" => "test3",
"@timestamp" => 2022-03-25T08:21:25.729Z,
"message" => "4",
"@version" => "1"
}
5 # 输入5,结果被丢弃
6 # 输入6,有结果返回
{
"host" => "test3",
"@timestamp" => 2022-03-25T08:21:33.179Z,
"message" => "6",
"@version" => "1"
}
geoip根据ip获取信息
GeoIP过滤器可以根据ip获取更详细的的信息,如地理位置等
input { stdin {} }
filter {
geoip{
source => message
}
}
output { stdout {} }
1 # 获取失败
{
"host" => "freezej",
"@version" => "1",
"message" => "1",
"@timestamp" => 2022-03-30T02:39:10.560Z,
"geoip" => {},
"tags" => [
[0] "_geoip_lookup_failure"
]
}
114.114.114.114
{
"host" => "freezej",
"message" => "114.114.114.114",
"@timestamp" => 2022-03-30T02:39:51.128Z,
"@version" => "1",
"geoip" => {
"longitude" => 113.722,
"continent_code" => "AS",
"latitude" => 34.7732,
"location" => {
"lon" => 113.722,
"lat" => 34.7732
},
"country_code3" => "CN",
"country_code2" => "CN",
"country_name" => "China",
"timezone" => "Asia/Shanghai",
"ip" => "114.114.114.114"
}
}
8.8.8.8
{
"host" => "freezej",
"message" => "8.8.8.8",
"@timestamp" => 2022-03-30T02:40:05.472Z,
"@version" => "1",
"geoip" => {
"longitude" => -97.822,
"continent_code" => "NA",
"latitude" => 37.751,
"location" => {
"lon" => -97.822,
"lat" => 37.751
},
"country_code3" => "US",
"country_code2" => "US",
"country_name" => "United States",
"timezone" => "America/Chicago",
"ip" => "8.8.8.8"
}
}
查看更新状态
curl -XGET 'localhost:9600/_node/stats/geoip_download_manager?pretty'
{
"host" : "freezej",
"version" : "7.14.2",
"http_address" : "127.0.0.1:9600",
"id" : "0a4c5c86-3103-471c-b3e8-e66fd8aaeb17",
"name" : "freezej",
"ephemeral_id" : "eafe5e1d-9ee2-421b-8a62-fab3d5023180",
"status" : "green",
"snapshot" : false,
"pipeline" : {
"workers" : 1,
"batch_size" : 125,
"batch_delay" : 50
},
"geoip_download_manager" : {
"download_stats" : {
"last_checked_at" : "2022-03-30T10:51:08+08:00",
"failures" : 0,
"status" : "succeeded",
"successes" : 1
},
"database" : {
"ASN" : {
"status" : "up_to_date",
"fail_check_in_days" : 0,
"last_updated_at" : "2022-03-30T10:38:09+08:00"
},
"City" : {
"status" : "up_to_date",
"fail_check_in_days" : 0,
"last_updated_at" : "2022-03-30T10:38:09+08:00"
}
}
}
}
date匹配日期
把某个字段的时间格式化,存放到另一个字段里
input { stdin {} }
filter {
date {
tag_on_failure => [ "match_fail" ] # 失败添加标签,默认是"_dateparsefailure"
match => [
"message", # 匹配的字段
"ISO8601", # 时间格式1【标准时间格式】:2015-01-01T01:12:23
"UNIX", # 时间格式2【时间戳】:1326149001
"UNIX_MS", # 时间格式3【毫秒时间戳】:1366125117000
"yyyy年MM月dd日HH时mm分ss秒" # 时间格式4【自定义】: 2022年3月15日15时00分00秒
]
target => "match_result" # 匹配结果,默认是"@timestamp"
timezone => "Asia/Shanghai" # 输入数据的时区
locale => "zh-cn" # 输入数据的语言
}
}
output { stdout {} }
输入输出:
test # 匹配不到
{
"@version" => "1",
"@timestamp" => 2022-03-25T09:14:13.621Z,
"host" => "test3",
"message" => "test",
"tags" => [
[0] "match_fail"
]
}
2015-01-01T01:12:23 # ISO8601
{
"@version" => "1",
"@timestamp" => 2022-03-25T09:14:19.060Z,
"host" => "test3",
"match_result" => 2014-12-31T17:12:23.000Z,
"message" => "2015-01-01T01:12:23"
}
1326149001 # 时间戳
{
"@version" => "1",
"@timestamp" => 2022-03-25T09:14:23.102Z,
"host" => "test3",
"match_result" => 2012-01-09T22:43:21.000Z,
"message" => "1326149001"
}
1366125117000 # 时间戳(毫秒)
{
"@version" => "1",
"@timestamp" => 2022-03-25T09:14:27.302Z,
"host" => "test3",
"match_result" => 2013-04-16T15:11:57.000Z,
"message" => "1366125117000"
}
2022年3月15日15时00分00秒 # 自定义
{
"@version" => "1",
"@timestamp" => 2022-03-25T09:14:31.134Z,
"host" => "test3",
"match_result" => 2022-03-15T07:00:00.000Z,
"message" => "2022年3月15日15时00分00秒"
}