标签:spark streaming flume 整合 avro netcat
近期,听了王家林老师的2016年大数据Spark“蘑菇云”行动,需要将flume,kafka和Spark streaming进行整合。
感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111)。Spark streaming的处理是直接输出有几个events。
一、配置文件
Flume 配置文件如下:example5.properties
注意要加上a1.sinks.k1.avro.useLocalTimeStamp = true,这一句,否则,总报这样的错误:“
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp
”
感谢这位兄弟提供解决方案:
http://blog.selfup.cn/1601.html
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.0.10 a1.sources.r1.port = 22222 a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.0.10 a1.sinks.k1.port = 11111 a1.sinks.k1.avro.useLocalTimeStamp = true 二、编写处理代码 //创建StreamingContext,10秒一个批次 val ssc = new StreamingContext(sparkConf, Seconds(10)) val hostname = args(0) val port = args(1).toInt val storageLevel = StorageLevel.MEMORY_ONLY val flumeStream = FlumeUtils.createStream(ssc, hostname, port) flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print() //开始运行 ssc.start() //计算完毕退出 ssc.awaitTermination() ssc.stop() 这里的一个坑就是老是报找不到FlumeUtils,其实他在spark-examples-1.6.1-hadoop2.6.0.jar这个包里, 我通过源代码加入了包啊,就是不行 val sparkConf = new SparkConf().setAppName("AdClickedStreamingStats") .setMaster("local[5]").setJars( List( "/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar", "/lib/kafka-0.10.0/kafka-clients-0.10.0.1.jar", "/lib/kafka-0.10.0/kafka_2.10-0.10.0.1.jar", "/lib/spark-1.6.1/spark-streaming_2.10-1.6.1.jar", "/lib/kafka-0.10.0/metrics-core-2.2.0.jar", "/lib/kafka-0.10.0/zkclient-0.8.jar", "/lib/spark-1.6.1/mysql-connector-java-5.1.13-bin.jar", "/lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar", "/opt/spark-1.5.0-bin-hadoop2.6/SparkApps.jar")) 没办法,霸王硬上弓,还是硬上吧, bin/spark-submit --class com.dt.spark.flume.SparkStreamingFlume --jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar --master local[5] SparkApps.jar 192.168.0.10 11111 这些歇菜了吧! 三、运行测试
1、先提交
在spark中提交,产生11111监听
* in/spark-submit --class com.dt.spark.flume.SparkStreamingFlume
--jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar
--master local[5] SparkApps.jar 192.168.0.10 11111
否则会连接不上11111端口。
2、flume启动
$ bin/flume-ng agent --conf conf --conf-file example5.properties --name a1 -Dflume.root.logger=INFO,console
因为avro的方式,会输出到11111端口,然后启动22222端口监听
中间有个坑就是报 Unable to create Rpc client using hostname: 192.168.0.10, port: 11111
这样的错误,原来是bin/flume-ng agent --conf conf --conf-file conf/example5.properties --name a1 -Dflume.root.logger=INFO,console
中的name搞错了
3、触发数据:
telnet localhost 22222
输入字符串,然后会在flume的控制台出现效果。
2016年大数据Spark“蘑菇云”行动之flume整合spark streaming
标签:spark streaming flume 整合 avro netcat
原文地址:http://36006798.blog.51cto.com/988282/1858178