码迷,mamicode.com
首页 > Web开发 > 详细

统一日志检索部署(es、logstash、kafka、flume)

时间:2018-06-26 18:01:46      阅读:357      评论:0      收藏:0      [点我收藏+]

标签:shang   lse   tar.gz   broker   日志   TE   sum   inter   socket   

flume:用来搜集日志,将日志传输至kakfa

kafka:作为缓存,存储来自flume的日志

es:作为存储媒介,存放日志

logstash:真对日志进行过滤处理

flume部署

获取安装包、解压

1 cd /usr/local/src && wget http://10.80.7.177/install_package/apache-flume-1.7.0-bin.tar.gz  && tar zxf apache-flume-1.7.0-bin.tar.gz -C /usr/local/

修改flumen-env.sh脚本,设置启动参数

1 cd /usr/local/apache-flume-1.7.0-bin 
2 vim conf/flume-env.sh
1 export JAVA_HOME=/usr/java/jdk1.8.0_121/
2 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote" //设置启动jvm使用的内存大小

编辑配置文件

1 vim conf/flume_kfk.conf    (备注:该配置文件名字随便起)
 1 agent.sources = s1
 2 agent.channels = c1
 3 agent.sinks = k1
 4 
 5 agent.sources.s1.type=exec
 6 
 7 #要搜集的日志文件据对路径
 8 agent.sources.s1.command=tail -F /root/test.log
 9 agent.sources.s1.channels=c1
10 agent.channels.c1.type=memory
11 agent.channels.c1.capacity=10000
12 agent.channels.c1.transactionCapacity=100
13 #设置Kafka接收器
14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
15 #设置Kafka的broker地址和端口号
16 agent.sinks.k1.brokerList=10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092
17 #设置Kafka的Topic
18 agent.sinks.k1.topic=kafkatest
19 #设置序列化方式
20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
21 agent.sinks.k1.channel=c1

创建kafka的topic

1 cd /data1/kafka/kafka_2.11-0.10.1.0/ && ./kafka-topics.sh --create --topic kafkatest --replication-factor 3 --partitions 20 --zookeeper 10.90.11.19:12181

启动flume

1 /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume_launcherclick.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true

测试

1 测试:向/root/test.log文件追加日志。登录kakfa manager上查看kafkatest 的topic中是否有消息。如果有的话,说明没有问题,如果没有,请检查。

部署supervisor监控flume

supervisor部署再次不多加赘述,详见:https://www.cnblogs.com/sailq21/p/9227592.html

编辑/etc/supervisord.conf

[unix_http_server]
file=/data/ifengsite/flume/supervisor.sock   ; the path to the socket file
[inet_http_server]         ; inet (TCP) server disabled by default
port=9001        ; ip_address:port specifier, *:port for all iface
[supervisord]
logfile=/data/logs/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false               ; start in foreground if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///data/ifengsite/flume/supervisor.sock ; use a unix:// URL  for a unix socket
[include]
files = /etc/supervisord.d/*.conf

编辑flume启动文件

 1 [program:flume-push]
 2 directory = /usr/local/apache-flume-1.7.0-bin/
 3 command = /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/push.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true
 4 autostart = true
 5 startsecs = 5
 6 autorestart = true
 7 startretries = 3
 8 user = root
 9 redirect_stderr = true
10 stdout_logfile_maxbytes = 20MB
11 stdout_logfile_backups = 20
12 stdout_logfile = /data/ifengsite/flume/logs/flume-supervisor.log

创建目录、并启动supervisor

1 mkdir -p /data/ifengsite/flume/logs/
2 supervisord -c /etc/supervisord.conf
3 重启supervisor:supervisorctl reload

测试:登录ip:9001查看supervisor

如果flume到kafka没有问题,接下来配置logstash

编辑flume_kfk.conf

1 vim /etc/logstash/conf.d/flume_kfk.conf
 1 input{
 2     kafka {
 3         bootstrap_servers => ["10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092"]
 4         client_id => "test"
 5         group_id => "test"
 6         consumer_threads => 5
 7         decorate_events => true
 8         topics => "kafkatest"
 9         type => "testqh"
10     }
11 }
12 filter {
13     mutate {
14         gsub => ["message","\\x","\\\x"]
15     }
16     json {
17         source => "message"
18         remove_field => ["message","beat","tags","source","kafka"]
19     }
20     date {
21         match => ["timestamp","ISO8601"]
22         timezone => "Asia/Shanghai"
23         target => "@timestamp"
24     }
25 
26 }
27 
28 #输出到标准输出用于debug,需要输出到es的时候,可配置为es接收。
29 output{
30     stdout{
31         codec => rubydebug
32     }
33 }

 

统一日志检索部署(es、logstash、kafka、flume)

标签:shang   lse   tar.gz   broker   日志   TE   sum   inter   socket   

原文地址:https://www.cnblogs.com/sailq21/p/9230336.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!