标签:
flume:从数据源拉取数据
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt a1.sources.r1.restartThrottle = 1000 a1.sources.r1.logStdErr = true #a1.sources.r1.restart = true a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.keepalive = 100 a1.sinks.k1.type =org.apache.flume.plugins.KafkaSink a1.sinks.k1.metadata.broker.list=192.168.22.7:9092,192.168.22.8:9092,192.168.22.9:9092 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.request.required.acks=1 a1.sinks.k1.max.message.size=1000000 a1.sinks.k1.producer.type=sync a1.sinks.k1.custom.encoding=UTF-8 a1.sinks.k1.custom.topic.name=test a1.sinks.k1.channel=c1 a1.sinks.k1.product.source.name=6
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
a1.sinks.k1.custom.topic.name=test
bin/flume-ng agent -c conf -f conf/test.properties -n a1 -Dflume.root.logger=INFO,console
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &
3.创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
集群情况下,localhost换成集群的master地址
4.查看kafka的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
三.SparkStream处理数据
1.用spark中自带例子进行测试
进入spark目录
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 my-consumer-group test 1
2.往源文件中加入数据
echo "test test" >> test.txt
sparkStream会统计源数据中单词的数量并输出
Flume+kakfa+sparkStream实时处理数据测试
标签:
原文地址:http://www.cnblogs.com/aijianiula/p/5206719.html