码迷,mamicode.com
首页 > 其他好文 > 详细

Spark-Streaming DirectKafka count 案例

时间:2019-07-28 19:56:51      阅读:109      评论:0      收藏:0      [点我收藏+]

标签:nbsp   ash   logs   ted   com   mes   rmi   ring   生成   

Spark-Streaming DirectKafka count 统计跟直接 kafka 统计类似,只不过这里使用的是 Direct 的方式,Direct方式使用的 kafka 低级API,不同的地方主要是在 createDirectStream这里。

统计代码如下

package com.hw.streaming

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object DirectKafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <brokers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
//    smallest和from beiginning是一样的
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset"->"smallest"
    )
//    生成Dstream
    val messages = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(",")(1))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // 开始计算
    ssc.start()
    ssc.awaitTermination()

  }

}

启动相关的 flume,kafka,参见:

https://www.cnblogs.com/hanwen1014/p/11260456.html

Spark-Streaming DirectKafka count 案例

标签:nbsp   ash   logs   ted   com   mes   rmi   ring   生成   

原文地址:https://www.cnblogs.com/hanwen1014/p/11260477.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!