码迷,mamicode.com
首页 > 其他好文 > 详细

spark窗口操作

时间:2019-12-25 17:36:28      阅读:93      评论:0      收藏:0      [点我收藏+]

标签:each   serialize   for   start   scribe   rmi   The   test   socket   

流数据的窗口操作:窗口操作,即把几个批次的数据整合到一个窗口里计算,并且窗口根据步长不断滑动。

本质:把小批次,小颗粒的数据任意进行大批次和大颗粒的数据统计,意味着批次采集周期不用设置太大,可以通过滑动窗口来调整数据出现的粒度。

code:

 

package com.home.spark.streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Description:
  *  TODO 窗口操作,即把几个批次的数据整合到一个窗口里计算,并且窗口根据步长不断滑动
  *   所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
  *   窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。
  *   如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。
  *   而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,
  *   并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。
  **/
object Ex_window {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark streaming wordcount")
    
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    //环境对象,设置采集周期
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))
    // TODO: 可以通过ssc.sparkContext 来访问SparkContext或者通过已经存在的SparkContext来创建StreamingContext

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.44.10:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](
        topics,
        kafkaParams
      )
    )


    //采集周期为5秒,窗口为15秒(包含三个采集批次),滑动步长为5秒,即每个批次滑动一次。
    val words: DStream[String] = kafkaStream.flatMap(t=>t.value().split(" ")).window(Seconds(15),Seconds(5))


    //    val words: DStream[String] = socketStream.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print

    // Start the computation
    // 通过 streamingContext.start()来启动消息采集和处理
    scc.start()

    // Wait for the computation to terminate
    // 通过streamingContext.stop()来手动终止处理程序
    scc.awaitTermination()

  }
}

spark窗口操作

标签:each   serialize   for   start   scribe   rmi   The   test   socket   

原文地址:https://www.cnblogs.com/asker009/p/12097912.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!