1、在flume官方网站下载最新的flume
2、解决flume安装包
cd /export/software/
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-flume-1.6.0-bin flume
3、创建flume配置文件
cd /export/servers/flume/conf/
mkdir myconf
vi exec.conf
输入以下内容:
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/1.log a1.sources.r1.channels = c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = myOrder #注意这里的topic a1.sinks.k1.brokerList = kafka01:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1
注:配置完毕,flume环节的工作基本完成。接下来准备目标数据文件。
4、准备目标数据的目录
mkdir -p /export/data/flume_sources/click_log
5、通过脚本创建目标文件并生产数据
for((i=0;i<=50000;i++));
do echo "message-"+$i >>/export/data/flume_sources/click_log/1.log;
done
注意:脚本名称叫做click_log_out.sh 需要使用root用户赋权。 chmod +x click_log_out.sh
6、开始打通所有流程
各个节点启动zookeeper集群
第一步:启动kafka集群(mini1,mini2,mini3-----kafka1,kafka2,kafka3)
nohup kafka-server-start.sh /export/servers/kafka/config/server.properties &
第二步:创建一个topic并开启consumer
kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 4 --topic myOrder
启动kafka consumer窗口(----consumer)
kafka-console-consumer.sh --zookeeper mini1:2181 --from-beginning --topic myOrder
第三步:执行数据上产的脚本(mini1-----dataSource)
sh click_log_out.sh
第四步:启动flume客户端(mini1-----flume)
./bin/flume-ng agent -n a1 -c conf -f conf/myconf/exec.conf -Dflume.root.logger=INFO,console
第五步:在第三步启动的kafka consumer窗口查看效果