码迷,mamicode.com
首页 > Web开发 > 详细

Spark学习七:spark streaming与flume集成

时间:2016-05-09 07:02:06      阅读:202      评论:0      收藏:0      [点我收藏+]

标签:

Spark学习七:spark streaming与flume集成

标签(空格分隔): Spark


一,启动flume

flume-conf.properties文件

agent002.sources = sources002
agent002.channels = channels002
agent002.sinks = sinks002


## define sources
agent002.sources.sources002.type = exec
agent002.sources.sources002.command = tail -F /opt/app/apache-flume-1.5.0-bin/monitor/log.input


## define channels
agent002.channels.channels002.type = memory
agent002.channels.channels002.capacity = 10000
agent002.channels.channels002.transactionCapacity = 10000
agent002.channels.channels002.byteCapacityBufferPercentage = 20
agent002.channels.channels002.byteCapacity = 800000


##define sinks
agent002.sinks.sinks002.type = avro
agent002.sinks.sinks002.hostname=study.com.cn
agent002.sinks.sinks002.port=9999


##relationship
agent002.sources.sources002.channels = channels002
agent002.sinks.sinks002.channel = channels002
bin/flume-ng agent --conf conf --name agent002 --conf-file conf/flume-conf.properties -Dflume.root.logger=INFO,console

二,spark stream开发和运行

1,pom.xml添加依赖的配置

 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.3.0

2,准备需要的jar包
技术分享

3,启动spark本地应用模式(添加响应的jar包)

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/app/spark-1.3.0-bin-2.5.0/externaljars/flume-avro-source-1.5.0.jar,/opt/app/spark-1.3.0-bin-2.5.0/externaljars/flume-ng-sdk-1.5.0.jar \
--master local[2]

4,flume001.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

val ssc = new StreamingContext(sc, Seconds(5))

val stream = FlumeUtils.createStream(ssc, "study.com.cn", 9999)

// val eventsCount = stream.count.map(cnt => "Recevied " + cnt + " flume events.")
// eventsCount.print()
val wordCountStream = stream.map(x => new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountStream.print()

ssc.start()
ssc.awaitTermination()

5,执行应用

scala > :load /opt/app/spark-1.3.0-bin-2.5.0/test/flume001.scala

6,测试

echo "hadoop hive spark" >>log.input 

Spark学习七:spark streaming与flume集成

标签:

原文地址:http://blog.csdn.net/youfashion/article/details/51348762

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!