Logstash的简单介绍

logstash的简介

logstash的概念:是一款开源的数据收集引擎,具有实时管道处理能力。
logstash具有200多个插件,可以接受各种各样的数据(如日志、网络请求、关系型数据库、传感器或物联网等等)
Logstash工作过程:
Logstash 就像管道符一样,读取输入数据,然后处理过滤数据,最后输出到目的地(数据在线程之间以事件的形式流传)
logstash数据处理过程包括三个部分:input、filter、output
input和output部分可以使用codecs对数据格式进行处理
Input,Filter,Output和codec:都是以插件形式存在
用户可以通过设置pipeline配置文件,设置符合需求的input、filter、outer、codecs插件实现对指定数据的采集、处理和输出功能

logstash的三大部分的介绍:
input:从数据源获取数据。不同的数据源使用不同的插件。常用的插件有:file、jdbc、Redis、syslog、beats 、http等
filter:用于处理数据。对从数据源获取到的数据按照需求进行处理(如:解析数据、删除字段、类型转换等)。常用的组件有:date、grok、dessect、mutate、json、geoip、ruby等
output:用于将数据输出到目的地。不同的目的地使用不同的插件。常用的插件有:elaticsearch、file、graphite、statsd等
codec:用于对数据进行编码解码。不是一个单独的流程,是用于input和output部分对数据进行编解码作用。常见的组件有:json、multiline等
logstash不是一个input-filter-output的数据流,而是一个 input | decode | filter | encode | output 的数据流。

logstash将数据转换为事件时候,会给事件添加一些额外的信息。下面介绍几个常见的额为信息:
@timestamp:用来标记事件的发生时间
host:标记事件发生地址
type:标记事件的唯一类型(input和output部分都可以配置多个不同的插件,每个插件可以用type来唯一标记这个插件,可以实现对不同的插件进行不同的处理)
tags:标记事件的某方面属性。这是一个数组,一个事件可以有多个标签

Logstash中的数据类型:
bool:use_column_value => true
string:jdbc_driver_class => “com.mysql.jdbc.Driver”
number:jdbc_page_size => 50000
array:hosts => [“192.168.57.100:9200″,”192.168.57.101:9200″,”192.168.57.102:9200”]
hash:options =>{key1 =>value1,key2 =>value2}

logastah中的逻辑运算符:
相等关系:==、!=、<、>、<=、>=
正则关系:=~(匹配正则)、!~(不匹配正则)
包含关系:in、not in
布尔操作:and(与)、or(或)、nand(非与)、xor(非或)
一元运算符:!(取反)、()(复合表达式)、!() (对复合表达式结果取反)

三大模块插件的简单介绍

input

标准输入插件:表示从控制台输入

input {stdin { }}

file插件:表示从文件读取数据(如日志文件)

input {
    file {
        # 要导入的文件的位置,
        path => "/var/lib/mysql/slow.log"            
        # 从文件开始的位置开始读,end表示从结尾开始读
        start_position => "beginning"
        # 多久之内没修改过的文件不读取,0为无限制,单位为秒
        ignore_older => 0  
        # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
        sincedb_path => "/dev/null"
        # type字段,可表明导入的日志类型
        type => "mysql-slow"
    }
}

filter

Filter模块是logstash功能强大的主要原因,可以对Logstash Event进行丰富的处理(比如解析数据、删除字段、类型转换等等)
常见的Filter插件如下:

插件名称 说明
date 日期解析
grok 正则匹配解析
dessect 分隔符解析
mutate 对字段做处理,比如删除、重命名、替换
json 按照Json格式解析字段内容到指定字段中
geoip 将ip地址进行解析获取地域信息(包括国别,省市,经纬度等)
ruby 利用ruby代码来动态修改logstash event

Date插件:可以将日期字符串解析为日期类型,然后替换@timestamp字段或者指定成其他字段(如果不指定为@timestamp字段,会使用系统的时间作为@timestamp的值)
将input中的event_date字段的字符串解析日期格式,将解析后的的数据作为@timestamp的值;event_date的日期字符串格式需要和后面的yyyy-MM-dd HH:mm:ss对应
如:2020/12/31 12:23:32  对应的格式为match =>[“event_date”,”yyyy/MM/dd HH:mm:ss”]

#将event_date字符串解析为日期类型,并将解析后的值作为@timestamp的值
filter { date{ match
=>["event_date","yyyy-MM-dd HH:mm:ss"] target =>"@timestamp" } }

Grok插件:基于正则解析数据
grok是filter最重要的插件,grok使用正则表达式来生成grok语法,将input的数据解析成需要的数据格式
grok中常用patterns的路径:logstash_filevendorundlejrubyx.xgemslogstash-patterns-core-x.x.xpatternsgrok-patterns
grok语法:%{SYNTAX:SEMANTIC}
SYNTAX:grok pattern的名称
SEMANTIC:赋值字段名称(将匹配到的值赋值给SEMANTIC这个字段)
如:%{NUMBER:field}:可以匹配数值类型,但是grok匹配出的内容都是字符串类型,可以通过在最后指定为int或者float来强转类型:%{NUMBER:field:int}
实例:控制台输入:hello 123456 666666

filter{
    grok{
        match => {
            "message" => "hello %{NUMBER:num_file:int} %{NUMBER:number_file}"
        }
    }
}

自定义正则表达式:
例子:将匹配到的值赋值给num_file这个字段:(?<num_file>[0-9]{6})
控制台输入:hello 123456 666666

filter{
    grok{
        match => {
            "message" => "hello (?<num_file>[0-9]{6}) %{NUMBER:number_file}"
        }
    }
}

自定义grok pattern:通过pattern_definitions参数,以键值对的方式定义pattern名称和内容(也可以通过pattern_dir参数,以文件的形式读取pattern)
例子:定义模板的名称为pattern_test,模板的内容是正则匹配8位的数字或者字母;并且使用了自定义的pattern
控制台输入:hello 1234abcd

filter{
    grok{
        match => {
            "message" => "%{pattern_test:number_file}"
        }
        pattern_definitions => {
            "pattern_test" => "hello [a-z0-9]{8}"
        }
    }
}

Dissect插件:基于分隔符原理解析数据
解决grok解析时消耗过多cpu资源的问题
语法简单,能处理的场景比较有限
它只能处理格式相似,且有分隔符的字符串
语法如下:
%{}里面是字段;字段名称可以自己定义
两个%{}之间是分隔符

例子:把日期和时间解析到同一个字段datetime_flie
控制台输入:Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool

filter{
    dissect{
        mapping => {
            "message" => "%{datetime_flie} %{+datetime_flie} %{+datetime_flie} %{local_host} %{sys_file}[%{pid}]: %{message_flle}"
        }
    }
}

Mutate插件:是使用最频繁的插件,可以对字段进行各种操作(比如重命名、删除、替换、更新等)
convert:类型转换
gsub字符串替换
split、join、merge字符串切割、数组合并为字符串、数组合并为数组
rename字段重命名
update、replace字段内容更新或替换。它们都可以更新字段的内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段的操作
remove_field删除字段
例子:对数据进行处理分割、添加字段和删除字段
控制台输入:hello Weiking 20200810 man

filter{
    mutate{
        split => ["message", " "]
        add_field => {
            "field_name" => "%{[message][0]}"
            "name" => "%{[message][1]}"
            "date" => "%{[message][2]}"
            "sex" => "%{[message][3]}"
        }
        remove_field => "sex"
    }
}

Json插件:将指定字段的json格式的字符串进行解析,如果不指定target,会把解析出来的json数据直接放到根级别,可以通过target将解析后的json数据放到指定的字段中
例子:将json格式的字符串解析放在trans_all字段中
控制台输入:{“trans_jnls_no”:”0″,”card_no”:”623061571015020279″,”terminal_no”:”zhanglq”,”atm_trans_types_type”:””,”customer_no”:”106913609″,”cash_mark”:”212″}

filter{
    json{
        source => "message"
        target => "trans_all"
        remove_field => "message"
    }
}

Geoip插件:基于Geoip库解析IP数据获取地域信息
GeoIP库:可以根据IP地址提供对应的地域信息(包括国别,省市,经纬度等)
例子:解析IP地址获取地域信息
控制台输入:124.160.75.205

filter{
    geoip{
        source => "message"
        remove_field => "message"
    }
}    

output

标准输出插件:多用于调试,从控制台输出

output {
    stdout {
        codec => rubydebug
    }
} 

elasticsearch插件:将数据输入到elaticsearch中

output {
    elasticsearch {
        #es的ip和端口
        hosts => ["http://192.168.57.100:9200"] 
        #集群:hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"]        
        #ES索引名称(自己定义的)
        index => "mdl_001_202008"
        #文档类型
        document_type => "MdL_001"
        #设置数据的id为数据库中的字段
        document_id => "%{risk_event_no}-%{rule_id}"
    }
}

logstash配置实例

logstash的配置的文件:input和output都可以配置多个不同的插件,filter可以针对input里面的每个数据源做不一样的过滤,通过各自定义的type来匹配
配置实例:

input{
    #从控制台输入数据
    stdin{ 
        type => "stdin"
    }  
    #读取文件中的数据
    file {
        # 要导入的文件的位置,
        path => "/opt/data/data.json"            
        # 从文件开始的位置开始读,end表示从结尾开始读
        start_position => "beginning"
        # 多久之内没修改过的文件不读取,0为无限制,单位为秒
        ignore_older => 0  
        # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
        sincedb_path => "/dev/null"
        # type字段,可表明导入的日志类型
        type => "file"
    }
}
filter{
        if[type] == "stdin"{
            mutate{
                rename => ["message","timestamp"]
            }
        }
        if[type] == "file"{
            json{
                source => "message"
            }
        }
}
output {
        if[type] == "file"{
          elasticsearch {
                #es的ip和端口
                hosts => ["http://192.168.57.100:9200"] 
                #集群:hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"]        
                #ES索引名称(自己定义的)
                index => "mdl_001_20200812"
                #文档类型
                document_type => "MdL_012"
                #设置数据的id为数据库中的字段
                document_id => "%{risk_event_no}"
            }

        }
        if[type] == "stdin"{
            stdout{
                codec => rubydebug
            }
        }
} 

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注