logstash的简介
logstash的概念:是一款开源的数据收集引擎,具有实时管道处理能力。
logstash具有200多个插件,可以接受各种各样的数据(如日志、网络请求、关系型数据库、传感器或物联网等等)
Logstash工作过程:
Logstash 就像管道符一样,读取输入数据,然后处理过滤数据,最后输出到目的地(数据在线程之间以事件的形式流传)
logstash数据处理过程包括三个部分:input、filter、output
input和output部分可以使用codecs对数据格式进行处理
Input,Filter,Output和codec:都是以插件形式存在
用户可以通过设置pipeline配置文件,设置符合需求的input、filter、outer、codecs插件实现对指定数据的采集、处理和输出功能
logstash的三大部分的介绍:
input:从数据源获取数据。不同的数据源使用不同的插件。常用的插件有:file、jdbc、Redis、syslog、beats 、http等
filter:用于处理数据。对从数据源获取到的数据按照需求进行处理(如:解析数据、删除字段、类型转换等)。常用的组件有:date、grok、dessect、mutate、json、geoip、ruby等
output:用于将数据输出到目的地。不同的目的地使用不同的插件。常用的插件有:elaticsearch、file、graphite、statsd等
codec:用于对数据进行编码解码。不是一个单独的流程,是用于input和output部分对数据进行编解码作用。常见的组件有:json、multiline等
logstash不是一个input-filter-output的数据流,而是一个 input | decode | filter | encode | output 的数据流。
logstash将数据转换为事件时候,会给事件添加一些额外的信息。下面介绍几个常见的额为信息:
@timestamp:用来标记事件的发生时间
host:标记事件发生地址
type:标记事件的唯一类型(input和output部分都可以配置多个不同的插件,每个插件可以用type来唯一标记这个插件,可以实现对不同的插件进行不同的处理)
tags:标记事件的某方面属性。这是一个数组,一个事件可以有多个标签
Logstash中的数据类型:
bool:use_column_value => true
string:jdbc_driver_class => “com.mysql.jdbc.Driver”
number:jdbc_page_size => 50000
array:hosts => [“192.168.57.100:9200″,”192.168.57.101:9200″,”192.168.57.102:9200”]
hash:options =>{key1 =>value1,key2 =>value2}
logastah中的逻辑运算符:
相等关系:==、!=、<、>、<=、>=
正则关系:=~(匹配正则)、!~(不匹配正则)
包含关系:in、not in
布尔操作:and(与)、or(或)、nand(非与)、xor(非或)
一元运算符:!(取反)、()(复合表达式)、!() (对复合表达式结果取反)
三大模块插件的简单介绍
input
标准输入插件:表示从控制台输入
input {stdin { }}
file插件:表示从文件读取数据(如日志文件)
input { file { # 要导入的文件的位置, path => "/var/lib/mysql/slow.log" # 从文件开始的位置开始读,end表示从结尾开始读 start_position => "beginning" # 多久之内没修改过的文件不读取,0为无限制,单位为秒 ignore_older => 0 # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析 sincedb_path => "/dev/null" # type字段,可表明导入的日志类型 type => "mysql-slow" } }
filter
Filter模块是logstash功能强大的主要原因,可以对Logstash Event进行丰富的处理(比如解析数据、删除字段、类型转换等等)
常见的Filter插件如下:
插件名称 | 说明 |
date | 日期解析 |
grok | 正则匹配解析 |
dessect | 分隔符解析 |
mutate | 对字段做处理,比如删除、重命名、替换 |
json | 按照Json格式解析字段内容到指定字段中 |
geoip | 将ip地址进行解析获取地域信息(包括国别,省市,经纬度等) |
ruby | 利用ruby代码来动态修改logstash event |
Date插件:可以将日期字符串解析为日期类型,然后替换@timestamp字段或者指定成其他字段(如果不指定为@timestamp字段,会使用系统的时间作为@timestamp的值)
将input中的event_date字段的字符串解析日期格式,将解析后的的数据作为@timestamp的值;event_date的日期字符串格式需要和后面的yyyy-MM-dd HH:mm:ss对应
如:2020/12/31 12:23:32 对应的格式为match =>[“event_date”,”yyyy/MM/dd HH:mm:ss”]
#将event_date字符串解析为日期类型,并将解析后的值作为@timestamp的值
filter { date{ match =>["event_date","yyyy-MM-dd HH:mm:ss"] target =>"@timestamp" } }
Grok插件:基于正则解析数据
grok是filter最重要的插件,grok使用正则表达式来生成grok语法,将input的数据解析成需要的数据格式
grok中常用patterns的路径:logstash_filevendorundlejrubyx.xgemslogstash-patterns-core-x.x.xpatternsgrok-patterns
grok语法:%{SYNTAX:SEMANTIC}
SYNTAX:grok pattern的名称
SEMANTIC:赋值字段名称(将匹配到的值赋值给SEMANTIC这个字段)
如:%{NUMBER:field}:可以匹配数值类型,但是grok匹配出的内容都是字符串类型,可以通过在最后指定为int或者float来强转类型:%{NUMBER:field:int}
实例:控制台输入:hello 123456 666666
filter{ grok{ match => { "message" => "hello %{NUMBER:num_file:int} %{NUMBER:number_file}" } } }
自定义正则表达式:
例子:将匹配到的值赋值给num_file这个字段:(?<num_file>[0-9]{6})
控制台输入:hello 123456 666666
filter{ grok{ match => { "message" => "hello (?<num_file>[0-9]{6}) %{NUMBER:number_file}" } } }
自定义grok pattern:通过pattern_definitions参数,以键值对的方式定义pattern名称和内容(也可以通过pattern_dir参数,以文件的形式读取pattern)
例子:定义模板的名称为pattern_test,模板的内容是正则匹配8位的数字或者字母;并且使用了自定义的pattern
控制台输入:hello 1234abcd
filter{ grok{ match => { "message" => "%{pattern_test:number_file}" } pattern_definitions => { "pattern_test" => "hello [a-z0-9]{8}" } } }
Dissect插件:基于分隔符原理解析数据
解决grok解析时消耗过多cpu资源的问题
语法简单,能处理的场景比较有限
它只能处理格式相似,且有分隔符的字符串
语法如下:
%{}里面是字段;字段名称可以自己定义
两个%{}之间是分隔符
例子:把日期和时间解析到同一个字段datetime_flie
控制台输入:Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool
filter{ dissect{ mapping => { "message" => "%{datetime_flie} %{+datetime_flie} %{+datetime_flie} %{local_host} %{sys_file}[%{pid}]: %{message_flle}" } } }
Mutate插件:是使用最频繁的插件,可以对字段进行各种操作(比如重命名、删除、替换、更新等)
convert:类型转换
gsub字符串替换
split、join、merge字符串切割、数组合并为字符串、数组合并为数组
rename字段重命名
update、replace字段内容更新或替换。它们都可以更新字段的内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段的操作
remove_field删除字段
例子:对数据进行处理分割、添加字段和删除字段
控制台输入:hello Weiking 20200810 man
filter{ mutate{ split => ["message", " "] add_field => { "field_name" => "%{[message][0]}" "name" => "%{[message][1]}" "date" => "%{[message][2]}" "sex" => "%{[message][3]}" } remove_field => "sex" } }
Json插件:将指定字段的json格式的字符串进行解析,如果不指定target,会把解析出来的json数据直接放到根级别,可以通过target将解析后的json数据放到指定的字段中
例子:将json格式的字符串解析放在trans_all字段中
控制台输入:{“trans_jnls_no”:”0″,”card_no”:”623061571015020279″,”terminal_no”:”zhanglq”,”atm_trans_types_type”:””,”customer_no”:”106913609″,”cash_mark”:”212″}
filter{ json{ source => "message" target => "trans_all" remove_field => "message" } }
Geoip插件:基于Geoip库解析IP数据获取地域信息
GeoIP库:可以根据IP地址提供对应的地域信息(包括国别,省市,经纬度等)
例子:解析IP地址获取地域信息
控制台输入:124.160.75.205
filter{ geoip{ source => "message" remove_field => "message" } }
output
标准输出插件:多用于调试,从控制台输出
output { stdout { codec => rubydebug } }
elasticsearch插件:将数据输入到elaticsearch中
output { elasticsearch { #es的ip和端口 hosts => ["http://192.168.57.100:9200"] #集群:hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"] #ES索引名称(自己定义的) index => "mdl_001_202008" #文档类型 document_type => "MdL_001" #设置数据的id为数据库中的字段 document_id => "%{risk_event_no}-%{rule_id}" } }
logstash配置实例
logstash的配置的文件:input和output都可以配置多个不同的插件,filter可以针对input里面的每个数据源做不一样的过滤,通过各自定义的type来匹配
配置实例:
input{ #从控制台输入数据 stdin{ type => "stdin" } #读取文件中的数据 file { # 要导入的文件的位置, path => "/opt/data/data.json" # 从文件开始的位置开始读,end表示从结尾开始读 start_position => "beginning" # 多久之内没修改过的文件不读取,0为无限制,单位为秒 ignore_older => 0 # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析 sincedb_path => "/dev/null" # type字段,可表明导入的日志类型 type => "file" } } filter{ if[type] == "stdin"{ mutate{ rename => ["message","timestamp"] } } if[type] == "file"{ json{ source => "message" } } } output { if[type] == "file"{ elasticsearch { #es的ip和端口 hosts => ["http://192.168.57.100:9200"] #集群:hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"] #ES索引名称(自己定义的) index => "mdl_001_20200812" #文档类型 document_type => "MdL_012" #设置数据的id为数据库中的字段 document_id => "%{risk_event_no}" } } if[type] == "stdin"{ stdout{ codec => rubydebug } } }