标签:ELK logstash input output filter
一、logstash浅析logstash就是一个过滤器,收集器。他是一个重量级的日志收集工具。使用logstash会消耗大量的服务器资源,所以通常需求高一点的配置。
过滤器:它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。
收集器:它可以采集指定地址的日志,并传输到elasticsearch、redis等目的地。
二、logstash的处理展示:
三、插件
input(输入插件):日志来源。
a:读取指定地址的日志
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
b:接收filebeat的日志
input {
beats {
port => 5044
}
}
c:读取redis中的数据
input {
redis {
batch_count => 1
# EVAL命令返回的事件数目,设置为5表示一次请求返回5条日志信息
data_type => "list"
# logstash redis插件工作方式
key => "logstash-test"
# 监听的键值
host => "127.0.0.1"
# redis端口号
password => "123456"
# 如果有安全认证,此项为认证密码
db => 0
# 如果应用使用了不同的数据库,此为redis数据库的编号,默认为0。
threads => 1
# 启用线程数量
}
}
d:读取syslog、rsyslog的数据
input {
syslog {
port => "514"
}
}
e:读取网络数据tcp (旧数据的处理)
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
通常与nc命令配合使用
# nc 127.0.0.1 8888 < olddata
f:标准输入stdin
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
# bin/logstash -f stdin.conf 后面输入字段,就会有输出(不常用)
2.filter
我的理解filter模块就是对日志的一个处理过程。简单介绍我使用过的几个插件。
a:grok 插件 :拆分日志,筛选需要的值,并进行排序。配合这个字段 remove_field => ["message"] 可以删除不需要的值。nginx日志为例:
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:time_iso8601}\] \[%{NUMBER:pid}\] \[%{IPORHOST:remote_addr}\] \[%{IPORHOST:http_host}(:%{NUMBER:http_host_port})?\] \[%{IPORHOST:upstream_addr}(:%{NUMBER:upstream_port})?\] \[%{WORD:verb} %{URIPATH:request_uri}(?:%{URIPARAM:request_parameter})? HTTP/%{NUMBER:httpversion}\] \[%{NUMBER:status}\] \[%{BASE10NUM:upstream_response_time} s\] \[%{BASE10NUM:request_time} s\] \[(?:%{NUMBER:bytes_sent}|-) bytes\] \[(?:%{NUMBER:body_bytes_sent}|-) bytes\] \[%{GREEDYDATA:http_user_agent}\]" }
remove_field => ["message"] #一个不想要的字段
}
b:mutate插件:处理数据的格式,特别是时间
mutate {
convert => {
"upstream_response_time" => "float"
"request_time" => "float"
"status" => "integer"
}
}
c:ruby插件 处理事件event,可以使用各种ruby语法
if [logtype] == "Feed" {
ruby {
code => '
request_uri = event.get("request_uri")
if request_uri.nil?
return
end
}
解释一下(定义request_uri字段,假如字段为空,结束操作)
d:date插件 定义时间
date {
locale => "en"
match => ["time_iso8601", "ISO8601"]
}
解释一下(当前日志的时间是美国时间,输出为北京时间)
f:drop的使用
if [message] !~ "^error" {
drop { }
}
解释一下(假如信息匹配到error,就过滤掉)
3.output 输出到
a:elasticsearch 最常用的插件
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
b:当然也可以输入到redis中 进行进一步处理(参数选择性添加)
output {
redis{
batch => true
batch_events => 50
batch_timeout => 5
codec => plain
congestion_interval => 1
congestion_threshold => 5
data_type => list
db => 0
host => ["127.0.0.1:6379"]
key => xxx
# list或channel的名字
password => xxx
# redis的密码,默认不使用
port => 6379
}
}
标签:ELK logstash input output filter
原文地址:http://blog.51cto.com/13043960/2121700