logstash的ruby使用

在logstash.conf配置中,可以使用ruby动态修改某个字段数据。

filter {
    if [type] == "deployment" {
        drop {}
    }

    mutate {
        remove_field => ["kafka"]
    }

    ruby {
        code => "
        timestamp = event.get('@timestamp') #从字段中获取@timestamp字段,
        localtime = timestamp.time + 28800 #加上8个小时偏差
        localtimeStr = localtime.strftime('%Y.%m.%d')
        event.set('localtime', localtimeStr) #保存最新时间
        "
    }
}
filter {
    grok {
        #match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)" "(?.*?)'}
        #match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
        match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) \[(?.*)\] "(?.*?) (?.*?)\?(d=)?(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
    }

    if [tags]{
        drop {}
    }

    if [status] != "200" {
        drop {}
    }

    ruby {
        init => "require 'base64'"
        code => "
        string = event.get('string')
        if string
            begin
                b64 = Base64.decode64(string).force_encoding('utf-8')
                #puts b64, event.get('message')
                event.set('b64_decode', b64)
            rescue ArgumentError
                event.set('b64_decode', '')
            end
        else
            event.set('b64_decode', '')
        end
        "
    }

    if [b64_decode == ""]{
        drop {}
    }

    kv {
        source => "b64_decode"
        field_split => "&?"
        value_split => "="
    }

    if [type] == "template" {
        mutate {
            remove_field => ["@timestamp", "@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
        }
    } else {
        date {
            match => ["timelocal", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        }

        mutate {
            remove_field => ["@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
        }
    }

}