标签:字段 sts server1 request htm img webkit byte apache
从上图中可以看到,logstash主要包含三大模块:
FILTERS是重点,来看看它常用到的几个插件:
*注意:codec也是经常会使用到的,它主要作用在INPUTS和OUTPUTS中,[提供有json的格式转换、multiline的多行日志合并等
一个简单的配置文件:
input { log4j { port => "5400" } beats { port => "5044" } } filter { # 多个过滤器会按声明的先后顺序执行 grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { elasticsearch { action => "index" hosts => "127.0.0.1:9200" # 或者 ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] ,支持均衡的写入ES的多个节点,一般为非master节点 index => "logstash-%{+YYYY-MM}" } stdout { codec=> rubydebug } file { path => "/path/to/target/file" } }
1. NodeJS 日志
$time - $remote_addr $log_level $path - $msg
2017-03-15 18:34:14.535 - 112.65.171.98 INFO /root/ws/socketIo.js - xxxxxx与ws server断开连接
filebeat: prospectors: - document_type: nodejs #申明type字段为nodejs,默认为log paths: - /var/log/nodejs/log #日志文件地址 input_type: log #从文件中读取 tail_files: true #以文件末尾开始读取数据 output: logstash: hosts: ["${LOGSTASH_IP}:5044"] #General Setting name: "server1" #设置beat的名称,默认为主机hostname
filter { if [type] == "nodejs" { #根据filebeat中设置的type字段,来过滤不同的解析规则 grok{ match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} - %{IPORHOST:clientip} %{LOGLEVEL:level} %{PATH:path} - %{GREEDYDATA:msg}" } } geoip { source => "clientip" #填写IP字段 } } }
这里是否发现@timestamp与timestamp不一致,@timestamp表示该日志的读取时间,在elasticsearch中作为时间检索索引。下面讲解Nginx日志时,会去修正这一问题。
2. Nginx 访问日志
$remote_addr - $remote_user [$time_local]
"$request" $status $body_bytes_sent "$http_referer"
"$http_user_agent" "$http_x_forwarded_for"
112.65.171.98 - - [15/Mar/2017:18:18:06 +0800] "GET /index.html HTTP/1.1" 200 1150 "http://www.yourdomain.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" "-"
- document_type: nginx paths: - /var/log/nginx/access.log #日志文件地址 input_type: log #从文件中读取 tail_files: true #以文件末尾开始读取数据
filter { if [type] == "nginx" { grok{ match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601" ] target => "@timestamp" #可省略 } } }
目前为止我们解析的都是单行的日志,向JAVA这样的,若果是多行的日志我们又该怎么做呢?
‘2017-03-16 15:52:39,580 ERROR TestController:26 - test: java.lang.NullPointerException at com.test.TestController.tests(TestController.java:22) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)‘
- document_type: tomcat paths: - /var/log/java/log #日志文件地址 input_type: log #从文件中读取 tail_files: true #以文件末尾开始读取数据 multiline: pattern: ^\d{4} match: after negate: true
filter { if [type] == "tomcat" { grok{ match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVALOGMESSAGE:msg}" } } date { match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ] } } }
Filebeat配置讲解
multiline 合并多行日志:
当然在logstash input中使用codec multiline设置是一样的
小技巧:关于grok的正则匹配,官方有给出Grok Constructor方法,在这上面提供了debugger、自动匹配等工具,方便大家编写匹配规则
主要的选项包括:
# action,默认是index,索引文档(logstash的事件)(ES架构与核心概念参考)。 # host,声明ES服务器地址端口 # index,事件写入的ES index,默认是logstash-%{+YYYY.MM.dd},按天分片index,一般来说我们会按照时间分片,时间格式参考http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。
详情请参考我的另外一篇文章:https://www.cnblogs.com/caoweixiong/p/11791396.html
参考:
https://www.jianshu.com/p/cc8dbd3bb401
标签:字段 sts server1 request htm img webkit byte apache
原文地址:https://www.cnblogs.com/caoweixiong/p/12576375.html