标签:
首先说明,各工具的安装就不在这说明了,网上很多,可自行查看。
我们在这里用实例说明各个工具的配置以及最后展示的效果。
假如我们有一批tracklog日志需要用ELK实时展示出来:
一、收集日志,我们使用flume工具
在日志服务器端布置agent发往收集collect,配置如下:
agent(可多个)
agent.sources = s1 agent.channels = m1 agent.sinks = k1
agent.sources.s1.interceptors=i1 agent.sources.s1.interceptors.i1.type=org.apache.flume.interceptor.HostBodyInterceptor$Builder
# For each one of the sources, the type is defined agent.sources.s1.type = com.source.tailDir.TailDirSourceNG agent.sources.s1.monitorPath=D:\\trackloguc agent.sources.s1.channels = m1 agent.sources.s1.fileEncode=gb2312
# Each sink‘s type must be defined agent.sinks.k1.type = avro agent.sinks.k1.hostname=10.130.2.249 agent.sinks.k1.port=26003 #agent.sinks.k1.type = logger agent.sinks.k1.channel = m1
# Each channel‘s type is defined. #agent.channels.m1.type = memory #agent.channels.m1.capacity=100000 agent.channels.m1.type = file agent.channels.m1.checkpointDir=..\\mobilecheck agent.channels.m1.dataDirs=..\\mobiledata agent.channels.m1.transactionCapacity=3000000 agent.channels.m1.capacity=10000000 |
collect
agent.sources = s1 agent.channels = m1 m2 agent.sinks = k1 k2 agent.source.s1.selector.type=replicating
# For each one of the sources, the type is defined agent.sources.s1.type = avro agent.sources.s1.bind=10.130.2.249 agent.sources.s1.port=26002 agent.sources.s1.channels = m1 m2 #放入Kafka agent.sinks.k1.type = org.apache.flume.plugins.KafkaSink agent.sinks.k1.metadata.broker.list=bdc53.hexun.com:9092,bdc54.hexun.com:9092,bdc46.hexun.com:9092 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.request.required.acks=0 agent.sinks.k1.max.message.size=100 agent.sinks.k1.producer.type=sync agent.sinks.k1.custom.encoding=UTF-8 agent.sinks.k1.custom.topic.name=TrackLogT agent.sinks.k1.channel = m2
#channel采用file方式,因为日志太大 agent.channels.m1.type = file agent.channels.m1.checkpointDir=/opt/modules/apache-flume-1.5.2-bin/tracklog-kafka/checkPoint agent.channels.m1.dataDirs=/opt/modules/apache-flume-1.5.2-bin/tracklog-kafka/dataDir agent.channels.m1.transactionCapacity = 1000000 agent.channels.m1.capacity=1000000 agent.channels.m1.checkpointInterval = 30000
|
二、数据入Kafka
上面collect中的topic需要再Kafka中预先建立,其他的步骤入Kafka已经在collect中配置完毕.
创建topic语句参考:
%{Kafka_HOME}/bin/kafka-topics.sh --create --zookeeper bdc41.hexun.com --replication-factor 3 --partitions 3 --topic TrackLogT |
查看topic数据语句参考:
%{Kafka_HOME}/bin/kafka-console-consumer.sh --zookeeper bdc46.hexun.com:2181,bdc40.hexun.com:2181,bdc41.hexun.com:2181 --topic TrackLogT |
三、从Kafka到ElasticSearch
我们使用logstash工具取kafka的数据入ES,主要原因是logstash工具与es和kibana比较贴合。
假如我们现在想获取topic为TrackLogT的数据入ES,logstash的配置如下:
input{ kafka { zk_connect => "bdc41.hexun.com:2181,bdc40.hexun.com:2181,bdc46.hexun.com:2181,bdc54.hexun.com:2181,bdc53.hexun.com:2181" group_id => "logstash" topic_id => "TrackLogT" reset_beginning => false # boolean (optional) consumer_threads => 5 # number (optional) decorate_events => true } }
filter { #multiline可以多行合一行,采用的行头正则匹配的方式。 multiline { pattern => "^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\s\d{4}-\d{1,2}-\d{1,2}\s\d{2}:\d{2}:\d{2}" negate => true what => "previous" } #下面是用空格分隔每一行 ruby { init =>"@kname =[‘hostIP‘,‘dateday‘,‘datetime‘,‘ip‘,‘cookieid‘,‘userid‘,‘logserverip‘,‘referer‘,‘requesturl‘,‘remark1‘,‘remark2‘,‘alexaflag‘,‘ua‘,‘wirelessflag‘]" code =>"event.append(Hash[@kname.zip(event[‘message‘].split(/ /))])" remove_field => ["message"] add_field=>{ "logsdate"=>"%{dateday}" } } #下面是替换logsdate字段中的-为空 mutate{ gsub=>["logsdate","-",""] # convert=>{"dateday"=>"integer"} } #对于logsdate的格式不合规范的数据drop if [logsdate] !~ /\d{8}/ { drop{} } #对外网ip进行解析,自动得出地理位置信息 geoip { source => "ip" # type => "linux-syslog" add_tag => [ "geoip" ] } #对ua进行解析 useragent { source => "ua" # type => "linux-syslog" add_tag => [ "useragent" ] } }
output{ #入es elasticsearch{ hosts => [ "10.130.2.53:9200","10.130.2.46:9200","10.130.2.54:9200" ] flush_size=>50000 workers => 5 index=> "logstash-tracklog" } } |
需要注意:
1.上面之所以对logsdate进行替换,原因在于:譬如2016-01-01形式的字段,入ES的时候,会被认为是时间格式,被自动补全为:2016-01-01 08:00:00 ,导致kibana需要按天展示字段不对。
2.对于一些异常数据,譬如logsdate列应为时间数字,如20160101,如果出现一些字母汉字的异常数据,在kibana中显示就会有问题,所以对这些数据drop掉。
3.因为不同的业务数据有不同的格式,需要对数据在filter中处理 ,需要用到的相关插件、相关语法,建议多看看logstash官方文档.
四、数据在Kibana展示
下面的是使用实例,仅供参考:
1.首先进入kibana页面,点击菜单【Setting】-【Indices】,
@:可以填入名称的通配形式,这样可以监控多个索引(一般是按天分索引的数据)
点击Create即可。
2.点击菜单【Discover】,选择你刚刚建立的Setting映射,可以发现如下:
@然后点击右上角的保存,输入名称即可。
@这个是后面展示图所要用到的数据源,当然你也可以在这里搜索你的数据,注意字符串两边最好加双引号。
3.点击【Visualize】,进行各种图标制作。
可以选择制作哪种展示图,例如制作日统计量的柱状图,点击最后一个。
@order by的字段类型必须是date或int型的,这就是为什么前面导数据的时候要强调,数据类型的重要性。
4.最后点击【DashBoard】菜单,进行仪表盘的制作;可以把前面的discover和Visualize报存的数据和图集合在此仪表盘。
Flume-Kafka-Logstash-ElasticSearch-Kibana流程说明
标签:
原文地址:http://blog.csdn.net/shan1369678/article/details/51330389