码迷,mamicode.com
首页 > 其他好文 > 详细

DStream 转换操作------有状态转换操作

时间:2017-10-20 18:29:17      阅读:191      评论:0      收藏:0      [点我收藏+]

标签:context   tap   foldleft   wordcount   ati   file   streaming   second   check   

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object DStream_转换操作 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("转换操作").setMaster("local[2]")
    val sc=new StreamingContext(conf,Seconds(4))
    val lines=sc.socketTextStream("localhost",8899)
    sc.checkpoint("file:///usr/local2/spark/mycode/kafa3/checkpoint")

    val words=lines.flatMap(x=>x.split(" "))
    val wordsStream=words.map(x=>(x,1))
    3.val stateStream=wordsStream.updateStateByKey[Int](update)
      sc.checkpoint("file:///usr/local2/spark/mycode/kafa2/checkpoint")
    1. //val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(16),Seconds(4),2)//DStream有状态转换操作
    2. val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(16),Seconds(4),2)
     wordCount.print(100)
    stateStream.print()
    sc.start()
    sc.awaitTermination()
  }
   val update=(values:Seq[Int],state:Option[Int])=>{
     val currentCount=values.foldLeft(0)(_+_)
     val previousCount= state.getOrElse(0)
     Some(currentCount+previousCount)
   }
}

注意:

reduceByKeyAndWindow中的Seconds(16)是滑动窗口长度,Seconds(4)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)这两个值必须是 new StreamingContext(conf,Seconds(4)) 中Seconds(4)的倍数(>=1)
如果第二个4<滑动窗口时间间隔 程序结果的时间线就变成了以滑动窗口时间间隔为准
1,2,3区别:
1.会保留历史对象的名字列表
2.不会保留
3.在历史值的基础上累加,但(1,2)会随着窗口滑动,所有对象的值会变为0
4.(1和2适合统计实时时间段内词频)

 

DStream 转换操作------有状态转换操作

标签:context   tap   foldleft   wordcount   ati   file   streaming   second   check   

原文地址:http://www.cnblogs.com/soyo/p/7700356.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!