码迷,mamicode.com
首页 > 其他好文 > 详细

scala spark(2.10)读取kafka(2.10)示例

时间:2018-03-19 20:53:42      阅读:775      评论:0      收藏:0      [点我收藏+]

标签:local   div   int()   create   appname   code   cond   array   hadoop   

1、pom加载jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>

2、代码
object Demo01 {

def main(args: Array[String]): Unit = {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers ="hadoop01:9092"
val topics="test"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines=messages.map(_._2)
val wordCounts=lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://hadoop01:9000/spark/wordcount.txt")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

scala spark(2.10)读取kafka(2.10)示例

标签:local   div   int()   create   appname   code   cond   array   hadoop   

原文地址:https://www.cnblogs.com/runnerjack/p/8604410.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!