码迷,mamicode.com
首页 > 其他好文 > 详细

Flink 操作示例 —— 水印

时间:2020-04-06 17:29:42      阅读:298      评论:0      收藏:0      [点我收藏+]

标签:record   pex   upd   cas   exec   null   最新   update   second   

内置水印生成器

1.有序生成

只需提取事件时间的时间戳作为水印即可。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

 

2.有界无序生成策略

设置延迟的上限。我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

 

自定义水印生成器

1.定期水印

AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
通过 ExecutionConfig.setAutoWatermarkInterval(...)  定义生成水印的间隔(每n毫秒)。 每次都会调用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大于前一个水印,则将发出一个新的水印。

class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {

  // 1 min in ms
  val bound: Long = 60 * 1000
  // the maximum observed timestamp
  var maxTs: Long = Long.MinValue

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // update maximum timestamp
    maxTs = maxTs.max(r.timestamp)
    // return record timestamp
    r.timestamp
  }

 

2.带标记的水印

AssignerWithPunctuatedWatermarks 根据元素的特定标记生成新的水印。 对于此类,Flink将首先调用 extractTimestamp(...) 方法为该元素分配时间戳,然后立即在该元素上调用 checkAndGetNextWatermark(...)方法。
将 checkAndGetNextWatermark(...) 方法传递给 extractTimestamp(...) 方法中分配的时间戳,并可以决定是否要生成水印。 每当 checkAndGetNextWatermark(...) 方法返回非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {

  // 1 min in ms
  val bound: Long = 60 * 1000

  override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
    if (r.id == "sensor_1") {
      // emit watermark if reading is from sensor_1
      new Watermark(extractedTS - bound)
    } else {
      // do not emit a watermark
      null
    }
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // assign record timestamp
    r.timestamp
  }
}

 

参考文章


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html

http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html

Flink 操作示例 —— 水印

标签:record   pex   upd   cas   exec   null   最新   update   second   

原文地址:https://www.cnblogs.com/lemos/p/12642793.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!