码迷,mamicode.com
首页 > Web开发 > 详细

Flume与SparkStreaming集成

时间:2016-05-12 18:42:16      阅读:192      评论:0      收藏:0      [点我收藏+]

标签:

1、flume创建配置文件flume-spark-tail-conf.properties

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called ‘agent‘

a2.sources = r2
a2.channels = c2
a2.sinks = k2

### define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/datas/spark_word_count.log
a2.sources.r2.shell = /bin/bash -c

### define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

### define sinks
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = bigdata.eclipse.com
a2.sinks.k2.port = 9999

### bind the sources and sinks to the channels
a2.sources.r2.channels=c2
a2.sinks.k2.channel = c2

2、将flume需要的jar包上传到spark中的extraljars目录下

用到的jar包如下:
1) spark-streaming-flume_2.10-1.3.0.jar
2) flume-avro-source-1.5.0-cdh5.3.6.jar
3)flume-ng-sdk-1.5.0-cdh5.3.6.jar  

3、启动sparkshell,命令如下

bin/spark-shell \
--jars /opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/app/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/extraljars/flume-ng-sdk-1.5.0-cdh5.3.6.jar \
--master local[2]

4、在sparkshell命令行模式下执行如下测试代码

-- 代码:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

val ssc = new StreamingContext(sc, Seconds(5))

val stream = FlumeUtils.createStream(ssc, "bigdata.eclipse.com", 9999)

// val eventsCount = stream.count.map(cnt => "Recevied " + cnt + " flume events.")
// eventsCount.print()
val wordCountStream = stream.map(x => new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountStream.print()

ssc.start()
ssc.awaitTermination()

---

5、开启flume

bin/flume-ng agent -c conf -n a2 -f conf/flume-spark-tail-conf.properties -Dflume.root.logger=INFO,console

6、spark_word_count.log添加内容进行测试

技术分享

Flume与SparkStreaming集成

标签:

原文地址:http://blog.csdn.net/yangcongyangling/article/details/51357113

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!