标签:shang lse tar.gz broker 日志 TE sum inter socket
flume:用来搜集日志,将日志传输至kakfa
kafka:作为缓存,存储来自flume的日志
es:作为存储媒介,存放日志
logstash:真对日志进行过滤处理
获取安装包、解压
1 cd /usr/local/src && wget http://10.80.7.177/install_package/apache-flume-1.7.0-bin.tar.gz && tar zxf apache-flume-1.7.0-bin.tar.gz -C /usr/local/
修改flumen-env.sh脚本,设置启动参数
1 cd /usr/local/apache-flume-1.7.0-bin 2 vim conf/flume-env.sh
1 export JAVA_HOME=/usr/java/jdk1.8.0_121/ 2 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote" //设置启动jvm使用的内存大小
编辑配置文件
1 vim conf/flume_kfk.conf (备注:该配置文件名字随便起)
1 agent.sources = s1 2 agent.channels = c1 3 agent.sinks = k1 4 5 agent.sources.s1.type=exec 6 7 #要搜集的日志文件据对路径 8 agent.sources.s1.command=tail -F /root/test.log 9 agent.sources.s1.channels=c1 10 agent.channels.c1.type=memory 11 agent.channels.c1.capacity=10000 12 agent.channels.c1.transactionCapacity=100 13 #设置Kafka接收器 14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink 15 #设置Kafka的broker地址和端口号 16 agent.sinks.k1.brokerList=10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092 17 #设置Kafka的Topic 18 agent.sinks.k1.topic=kafkatest 19 #设置序列化方式 20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder 21 agent.sinks.k1.channel=c1
创建kafka的topic
1 cd /data1/kafka/kafka_2.11-0.10.1.0/ && ./kafka-topics.sh --create --topic kafkatest --replication-factor 3 --partitions 20 --zookeeper 10.90.11.19:12181
启动flume
1 /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume_launcherclick.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true
测试
1 测试:向/root/test.log文件追加日志。登录kakfa manager上查看kafkatest 的topic中是否有消息。如果有的话,说明没有问题,如果没有,请检查。
supervisor部署再次不多加赘述,详见:https://www.cnblogs.com/sailq21/p/9227592.html
编辑/etc/supervisord.conf
[unix_http_server] file=/data/ifengsite/flume/supervisor.sock ; the path to the socket file [inet_http_server] ; inet (TCP) server disabled by default port=9001 ; ip_address:port specifier, *:port for all iface [supervisord] logfile=/data/logs/supervisord.log ; main log file; default $CWD/supervisord.log logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 loglevel=info ; log level; default info; others: debug,warn,trace pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid nodaemon=false ; start in foreground if true; default false minfds=1024 ; min. avail startup file descriptors; default 1024 minprocs=200 ; min. avail process descriptors;default 200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///data/ifengsite/flume/supervisor.sock ; use a unix:// URL for a unix socket [include] files = /etc/supervisord.d/*.conf
编辑flume启动文件
1 [program:flume-push] 2 directory = /usr/local/apache-flume-1.7.0-bin/ 3 command = /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/push.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true 4 autostart = true 5 startsecs = 5 6 autorestart = true 7 startretries = 3 8 user = root 9 redirect_stderr = true 10 stdout_logfile_maxbytes = 20MB 11 stdout_logfile_backups = 20 12 stdout_logfile = /data/ifengsite/flume/logs/flume-supervisor.log
创建目录、并启动supervisor
1 mkdir -p /data/ifengsite/flume/logs/ 2 supervisord -c /etc/supervisord.conf 3 重启supervisor:supervisorctl reload
测试:登录ip:9001查看supervisor
如果flume到kafka没有问题,接下来配置logstash
编辑flume_kfk.conf
1 vim /etc/logstash/conf.d/flume_kfk.conf
1 input{ 2 kafka { 3 bootstrap_servers => ["10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092"] 4 client_id => "test" 5 group_id => "test" 6 consumer_threads => 5 7 decorate_events => true 8 topics => "kafkatest" 9 type => "testqh" 10 } 11 } 12 filter { 13 mutate { 14 gsub => ["message","\\x","\\\x"] 15 } 16 json { 17 source => "message" 18 remove_field => ["message","beat","tags","source","kafka"] 19 } 20 date { 21 match => ["timestamp","ISO8601"] 22 timezone => "Asia/Shanghai" 23 target => "@timestamp" 24 } 25 26 } 27 28 #输出到标准输出用于debug,需要输出到es的时候,可配置为es接收。 29 output{ 30 stdout{ 31 codec => rubydebug 32 } 33 }
统一日志检索部署(es、logstash、kafka、flume)
标签:shang lse tar.gz broker 日志 TE sum inter socket
原文地址:https://www.cnblogs.com/sailq21/p/9230336.html