码迷,mamicode.com
首页 > 其他好文 > 详细

Flink 滑动窗口使用触发器会触发多个窗口的计算

时间:2019-10-27 16:34:31      阅读:470      评论:0      收藏:0      [点我收藏+]

标签:add   更新   使用   个数   extends   tostring   and   第一条   输出   

之前有小伙伴在群里说:滑动窗口使用触发器让每条数据都触发一次计算

但是他并没有得到预期的结果:每条数据都触发一次计算,输出一条结果,而是每天数据都输出了很多条结果

为什么会这样呢?

写了个小案例,来解释这种情况

为了方便使用自定义的 source 开发数据:

class StringSourceFunction extends SourceFunction[String] {

  var flag = true

  override def cancel(): Unit = {

    flag = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    while (flag) {
      val str = StringUtil.getRandomString(1).toUpperCase
      ctx.collect(str + "," + StringUtil.getRandomString(1).toUpperCase)
      Thread.sleep(1000)
    }
  }

}

就是个简单的 souce,每秒对外发出随机的 string 字符串(基本一分钟 60 条)

对应的计算程序如下:

env.addSource(new StringSourceFunction)
      .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))
      // 每条数据触发一次计算
      //.trigger(CountTrigger.of(1))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
        // 窗口
        val window = context.window.toString
        // 简单计算下窗口里面的元素个数
        var count: Long = 0
        elements.iterator.foreach(s => count += 1)


        out.collect("time : " + sdf.format(System.currentTimeMillis()) + ", window : " + window + ", element counter : " + count)
      }
    })
      .print("")

定义了一个 一分钟的窗口,滑动间隔是10秒,一条数据就应该属于6个窗口

技术图片

比如: 5 这条数据属于:(-50,10)(-40,20)(-30,30)(-20,40)(-10,50)(0,60) 这6 个窗口

注释 trigger 看结果:

技术图片

10秒滑动间隔,就是10秒有一个滑动一次,一个窗口结束,触发一次计算,输出一个结果(前面6个窗口,因为刚启动数据不够60条)

开启了tirgger 结果就完全不一样了

技术图片

可以看到,第一条数据进去的时候,触发了6次计算,因为它属于6个窗口,tirgger 会触发6次

 欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

技术图片

 

Flink 滑动窗口使用触发器会触发多个窗口的计算

标签:add   更新   使用   个数   extends   tostring   and   第一条   输出   

原文地址:https://www.cnblogs.com/Springmoon-venn/p/11747789.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!