FreezeJ' Blog

logstash filter介绍

2022-03-25

本文参考官方文档:https://www.elastic.co/guide/en/logstash/7.14/filter-plugins.html
介绍常用filter的用法,持续补充

不使用filter

input { stdin {} }

filter {}

output { stdout {} }

输入输出:

1
{
       "message" => "1",
          "host" => "test3",
    "@timestamp" => 2022-03-25T08:02:33.415Z,
      "@version" => "1"
}

grok正则匹配

match 匹配字段

普通match格式为%{PATTERN:字段名},默认grok支持的PATTERN可以通过在logstash目录下find找到:

find ./ | grep grok-patterns
./vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.1/patterns/ecs-v1/grok-patterns
./vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.1/patterns/legacy/grok-patterns

简单例子:

input { stdin {} }

filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
}

output { stdout {} }

输入输出:

55.3.244.1 GET /index.html 15824 0.043
{
        "method" => "GET",
      "@version" => "1",
         "bytes" => "15824",
    "@timestamp" => 2022-03-29T07:43:40.816Z,
       "request" => "/index.html",
        "client" => "55.3.244.1",
          "host" => "test3",
      "duration" => "0.043",
       "message" => "55.3.244.1 GET /index.html 15824 0.043"
}

多重匹配:

input { stdin {} }

filter {
      grok {
        match => {
          "message" => [
            "Duration: %{NUMBER:duration}",
            "Speed: %{NUMBER:speed}"
          ]
        }
      }
}

output { stdout {} }
Speed: 123
{
          "host" => "test3",
    "@timestamp" => 2022-03-29T08:21:05.944Z,
       "message" => "Speed: 123",
      "@version" => "1",
         "speed" => "123"
}
Duration: 123
{
          "host" => "test3",
      "duration" => "123",
    "@timestamp" => 2022-03-29T08:22:00.517Z,
       "message" => "Duration: 123",
      "@version" => "1"
}
123
{
          "host" => "test3",
    "@timestamp" => 2022-03-29T08:22:03.095Z,
       "message" => "123",
      "@version" => "1",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

break_on_match这个参数可以定义匹配多个PATTERN的时候,是否一匹配到就跳出,默认为true,如果设置为false则匹配完所有。

自定义pattern匹配

使用patterns_dir指定自定义正则目录,编辑patterns/test内容:

# 自定义正则名称   正则表达式
PHONE_NUMBER ^1[3-9]\d{9}$
input { stdin {} }

filter {
      grok {
        patterns_dir => ["./patterns"]
        match => { "message" => "%{PHONE_NUMBER:phone}" }
      }
}

output { stdout {} }
123
{
       "message" => "123",
      "@version" => "1",
          "host" => "test3",
    "@timestamp" => 2022-03-29T08:11:53.344Z,
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
13100000000
{
       "message" => "13100000000",
      "@version" => "1",
          "host" => "test3",
    "@timestamp" => 2022-03-29T08:11:59.463Z,
         "phone" => "13100000000"
}

keep_empty_captures 匹配保留空值

当正则表达式匹配的内容允许包含”空”的时候,比如:

ANY .*

需要把keep_empty_captures设置为true才会保留“空”的值。

input { stdin {} }

filter {
      grok {
        keep_empty_captures => true
        patterns_dir => ["./patterns"]
        match => {
          "message" => "test:%{ANY:test}"
        }
      }
}

output { stdout {} }
123
{
       "message" => "123",
    "@timestamp" => 2022-03-29T09:02:43.857Z,
          "host" => "test3",
      "@version" => "1",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
test:123
{
       "message" => "test:123",
          "test" => "123",
    "@timestamp" => 2022-03-29T09:02:48.244Z,
          "host" => "test3",
      "@version" => "1"
}
test:
{
       "message" => "test:",
          "test" => "",  # 保留
    "@timestamp" => 2022-03-29T09:02:51.964Z,
          "host" => "test3",
      "@version" => "1"
}

target 匹配结果保存到目标

input { stdin {} }

filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
        target => info
      }
}

output { stdout {} }
55.3.244.1 GET /index.html 15824 0.043
{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
    "@timestamp" => 2022-03-29T09:10:33.658Z,
          "info" => {
           "bytes" => "15824",
         "request" => "/index.html",
        "duration" => "0.043",
          "method" => "GET",
          "client" => "55.3.244.1"
    },
          "host" => "test3",
      "@version" => "1"
}

overwrite 匹配结果复写原有字段

默认overwrite[],不会复写任何匹配的字段,只会追加(append)。

input { 
  stdin {
      add_field => { "info" => "default" }
  }
}

filter {
      grok {
        match => { "message" => "info: %{WORD:info}" }
        overwrite => [ "info" ]
      }
}

output { stdout {} }

输入输出:

123
{
    "@timestamp" => 2022-03-29T09:17:59.760Z,
       "message" => "123",
          "info" => "default",
          "host" => "test3",
          "tags" => [
        [0] "_grokparsefailure"
    ],
      "@version" => "1"
}
info: new_info
{
       "message" => "info: new_info",
    "@timestamp" => 2022-03-29T09:18:08.750Z,
          "info" => "new_info",
          "host" => "test3",
      "@version" => "1"
}

如果不设置overwrite

info: 123 
{
      "@version" => "1",
    "@timestamp" => 2022-03-29T09:20:14.927Z,
       "message" => "info: 123",
          "info" => [
        [0] "default",
        [1] "123"
    ],
          "host" => "test3"
}

mutate修改字段

mutate字段可以执行以下操作,比较复杂的有示例,普通的查下官网:

coerce 设置默认值

只有值为nil的才会被设置默认值,如果字段不存在,则不会添加默认值。

input { stdin {} }

filter {
  ruby { code => 'event.set("test1", nil)' } # 设置一个nil值
  mutate { 
    coerce => {
      "test1" => "default value"  # 存在test1字段,并且值为nil,会被设置默认值
      "test2" => "default value"  # 由于不存在test2字段,不会被设置默认值
    }
  }
}

output { stdout {} }

输入输出:

123
{
       "message" => "123",
    "@timestamp" => 2022-03-28T03:55:49.797Z,
         "test1" => "default value",
          "host" => "test3",
      "@version" => "1"
}

rename 重命名字段

input { stdin {} }

filter {
  mutate { 
    rename => { "message" => "new_message" }
  }
}

output { stdout {} }

输入输出:

123
{
    "new_message" => "123",
     "@timestamp" => 2022-03-28T04:00:13.845Z,
       "@version" => "1",
           "host" => "test3"
}

update 更新字段

如果没有该字段,则无操作

input { stdin {} }

filter {
  mutate { 
    update => { "message" => "456" }
  }
}

output { stdout {} }

输入输出:

123
{
       "message" => "456",
      "@version" => "1",
    "@timestamp" => 2022-03-28T04:11:18.504Z,
          "host" => "test3"
}

replace 替换字段

可以使用%{foo}来表示字段,如果没有该字段会添加字段

input { stdin {} }

filter {
  mutate { 
    replace => { "message" => "%{host}: %{message}" }
  }
}

output { stdout {} }

输入输出:

123
{
    "@timestamp" => 2022-03-28T04:16:36.036Z,
          "host" => "test3",
      "@version" => "1",
       "message" => "test3: 123"
}
456
{
    "@timestamp" => 2022-03-28T04:16:39.136Z,
          "host" => "test3",
      "@version" => "1",
       "message" => "test3: 456"
}

convert类型转换

input { stdin {} }

filter {
  mutate { 
    convert => { "message" => float }
  }
}

output { stdout {} }

输入输出:

123
{
       "message" => 123.0,
      "@version" => "1",
          "host" => "test3",
    "@timestamp" => 2022-03-28T04:22:27.110Z
}

gsub 替换内容

input { stdin {} }

filter {
  mutate { 
    gsub => [
        "message", "[-_/]", "."
    ] 
  }
}

output { stdout {} }

输入输出:

2022/01/01 
{
       "message" => "2022.01.01",
    "@timestamp" => 2022-03-28T06:32:02.663Z,
          "host" => "test3",
      "@version" => "1"
}
2022-01-01
{
       "message" => "2022.01.01",
    "@timestamp" => 2022-03-28T06:32:08.917Z,
          "host" => "test3",
      "@version" => "1"
}
2022_01_01
{
       "message" => "2022.01.01",
    "@timestamp" => 2022-03-28T06:32:17.842Z,
          "host" => "test3",
      "@version" => "1"
}

uppercase 大写

input { stdin {} }

filter {
  mutate { 
        uppercase => [ "message" ]
  }
}

output { stdout {} }

输入输出:

abcd
{
    "@timestamp" => 2022-03-28T06:34:18.980Z,
       "message" => "ABCD",
          "host" => "test3",
      "@version" => "1"
}

capitalize 首字母大写

input { stdin {} }

filter {
  mutate { 
        capitalize => [ "message" ]
  }
}

output { stdout {} }

输入输出:

abcd
{
      "@version" => "1",
          "host" => "test3",
       "message" => "Abcd",
    "@timestamp" => 2022-03-28T06:35:43.118Z
}

lowercase 小写

input { stdin {} }

filter {
  mutate { 
    lowercase => [ "message" ]
  }
}

output { stdout {} }

输入输出:

ABCD
{
      "@version" => "1",
          "host" => "test3",
    "@timestamp" => 2022-03-28T06:36:58.364Z,
       "message" => "abcd"
}

strip 剔除空字符

input { stdin {} }

filter {
  mutate {
      strip => ["message"]
  }
}

output { stdout {} }

输入输出:

123  # 开头空格
{
          "host" => "test3",
      "@version" => "1",
       "message" => "123",
    "@timestamp" => 2022-03-28T06:39:45.745Z
}
123    # 结尾空格
{
          "host" => "test3",
      "@version" => "1",
       "message" => "123",
    "@timestamp" => 2022-03-28T06:39:51.762Z
}
123  123  # 中间空格
{
          "host" => "test3",
      "@version" => "1",
       "message" => "123  123",
    "@timestamp" => 2022-03-28T06:40:42.050Z
}

remove 移除

remove_field 移除字段
remove_tag 移除tag

input { stdin {} }

filter {
  mutate {
     remove_field => ["message"]
     remove_tag => ["json"]
  }
}

output { stdout {} }

输入输出:

123
{
      "@version" => "1",
    "@timestamp" => 2022-03-28T06:45:27.784Z,
          "host" => "test3"
}

split 拆分字段

input { stdin {} }

filter {
  mutate { 
         split => { "message" => "," }
  }
}

output { stdout {} }

输入输出:

1,2,3
{
    "@timestamp" => 2022-03-28T06:49:44.231Z,
       "message" => [
        [0] "1",
        [1] "2",
        [2] "3"
    ],
          "host" => "test3",
      "@version" => "1"
}

join 连接字段

input {
  stdin {
    add_field => { "test_array" => ["1", "2", "3"]}
  }

}

filter {
  mutate {
      join => { "test_array" => "," }
  }
}

output { stdout {} }

输入输出:

123
{
    "test_array" => "1,2,3",
          "host" => "test3",
       "message" => "123",
    "@timestamp" => 2022-03-28T06:55:03.926Z,
      "@version" => "1"
}

merge 合并字段

两个字符串合并会变成数组

input {
  stdin {
    add_field => { "test1" => "1111"}
    add_field => { "test2" => "2222"}
  }

}

filter {
  mutate {
      merge => { "test2" => "test1" }
  }
}

output { stdout {} }

输入输出:

{
         "test2" => [
        [0] "2222",
        [1] "1111"
    ],
      "@version" => "1",
         "test1" => "1111",
    "@timestamp" => 2022-03-28T06:58:01.010Z,
       "message" => "",
          "host" => "test3"
}

copy 复制字段

input { stdin {} }

filter {
  mutate { 
      copy => { "message" => "message_copy" }
  }
}

output { stdout {} }

输入输出:

123
{
         "message" => "123",
        "@version" => "1",
            "host" => "test3",
      "@timestamp" => 2022-03-28T07:00:00.966Z,
    "message_copy" => "123"
}

drop丢弃

percentage丢弃概率

丢弃50%数据:

input { stdin {} }

filter {
    drop {
      percentage => 50
    }
}

output { stdout {} }

有50%的数据被丢弃,这是一个丢弃概率,这次丢弃,下次也可能是丢弃:

1  # 输入1,结果被丢弃
2  # 输入2,结果被丢弃
3  # 输入3,有结果返回
{
          "host" => "test3",
    "@timestamp" => 2022-03-25T08:21:23.656Z,
       "message" => "3",
      "@version" => "1"
}
4  # 输入4,有结果返回
{
          "host" => "test3",
    "@timestamp" => 2022-03-25T08:21:25.729Z,
       "message" => "4",
      "@version" => "1"
}
5  # 输入5,结果被丢弃
6  # 输入6,有结果返回
{
          "host" => "test3",
    "@timestamp" => 2022-03-25T08:21:33.179Z,
       "message" => "6",
      "@version" => "1"
}

geoip根据ip获取信息

GeoIP过滤器可以根据ip获取更详细的的信息,如地理位置等

input { stdin {} }

filter {
    geoip{
        source => message
    }
}

output { stdout {} }
1  # 获取失败
{
          "host" => "freezej",
      "@version" => "1",
       "message" => "1",
    "@timestamp" => 2022-03-30T02:39:10.560Z,
         "geoip" => {},
          "tags" => [
        [0] "_geoip_lookup_failure"
    ]
}
114.114.114.114
{
          "host" => "freezej",
       "message" => "114.114.114.114",
    "@timestamp" => 2022-03-30T02:39:51.128Z,
      "@version" => "1",
         "geoip" => {
             "longitude" => 113.722,
        "continent_code" => "AS",
              "latitude" => 34.7732,
              "location" => {
            "lon" => 113.722,
            "lat" => 34.7732
        },
         "country_code3" => "CN",
         "country_code2" => "CN",
          "country_name" => "China",
              "timezone" => "Asia/Shanghai",
                    "ip" => "114.114.114.114"
    }
}
8.8.8.8
{
          "host" => "freezej",
       "message" => "8.8.8.8",
    "@timestamp" => 2022-03-30T02:40:05.472Z,
      "@version" => "1",
         "geoip" => {
             "longitude" => -97.822,
        "continent_code" => "NA",
              "latitude" => 37.751,
              "location" => {
            "lon" => -97.822,
            "lat" => 37.751
        },
         "country_code3" => "US",
         "country_code2" => "US",
          "country_name" => "United States",
              "timezone" => "America/Chicago",
                    "ip" => "8.8.8.8"
    }
}

查看更新状态

curl -XGET 'localhost:9600/_node/stats/geoip_download_manager?pretty'
{
  "host" : "freezej",
  "version" : "7.14.2",
  "http_address" : "127.0.0.1:9600",
  "id" : "0a4c5c86-3103-471c-b3e8-e66fd8aaeb17",
  "name" : "freezej",
  "ephemeral_id" : "eafe5e1d-9ee2-421b-8a62-fab3d5023180",
  "status" : "green",
  "snapshot" : false,
  "pipeline" : {
    "workers" : 1,
    "batch_size" : 125,
    "batch_delay" : 50
  },
  "geoip_download_manager" : {
    "download_stats" : {
      "last_checked_at" : "2022-03-30T10:51:08+08:00",
      "failures" : 0,
      "status" : "succeeded",
      "successes" : 1
    },
    "database" : {
      "ASN" : {
        "status" : "up_to_date",
        "fail_check_in_days" : 0,
        "last_updated_at" : "2022-03-30T10:38:09+08:00"
      },
      "City" : {
        "status" : "up_to_date",
        "fail_check_in_days" : 0,
        "last_updated_at" : "2022-03-30T10:38:09+08:00"
      }
    }
  }
}

date匹配日期

把某个字段的时间格式化,存放到另一个字段里

input { stdin {} }

filter {
    date {
      tag_on_failure => [ "match_fail" ]  # 失败添加标签,默认是"_dateparsefailure"
      match => [ 
                 "message",               # 匹配的字段
                 "ISO8601",               # 时间格式1【标准时间格式】:2015-01-01T01:12:23
                 "UNIX",                  # 时间格式2【时间戳】:1326149001
                 "UNIX_MS",               # 时间格式3【毫秒时间戳】:1366125117000
                 "yyyy年MM月dd日HH时mm分ss秒"   # 时间格式4【自定义】: 2022年3月15日15时00分00秒
               ]
      target => "match_result"            # 匹配结果,默认是"@timestamp"
      timezone => "Asia/Shanghai"         # 输入数据的时区
      locale => "zh-cn"                   # 输入数据的语言
    }
}

output { stdout {} }

输入输出:

test  # 匹配不到
{
      "@version" => "1",
    "@timestamp" => 2022-03-25T09:14:13.621Z,
          "host" => "test3",
       "message" => "test",
          "tags" => [
        [0] "match_fail"
    ]
}
2015-01-01T01:12:23  # ISO8601
{
        "@version" => "1",
      "@timestamp" => 2022-03-25T09:14:19.060Z,
            "host" => "test3",
    "match_result" => 2014-12-31T17:12:23.000Z,
         "message" => "2015-01-01T01:12:23"
}
1326149001  # 时间戳
{
        "@version" => "1",
      "@timestamp" => 2022-03-25T09:14:23.102Z,
            "host" => "test3",
    "match_result" => 2012-01-09T22:43:21.000Z,
         "message" => "1326149001"
}
1366125117000  # 时间戳(毫秒)
{
        "@version" => "1",
      "@timestamp" => 2022-03-25T09:14:27.302Z,
            "host" => "test3",
    "match_result" => 2013-04-16T15:11:57.000Z,
         "message" => "1366125117000"
}
2022年3月15日15时00分00秒  # 自定义
{
        "@version" => "1",
      "@timestamp" => 2022-03-25T09:14:31.134Z,
            "host" => "test3",
    "match_result" => 2022-03-15T07:00:00.000Z,
         "message" => "2022年3月15日15时00分00秒"
}
Tags: ELK