码迷,mamicode.com
首页 > 其他好文 > 详细

Flink水印机制(watermark)

时间:2019-08-31 19:01:39      阅读:124      评论:0      收藏:0      [点我收藏+]

标签:link   结束   add   有一个   size   rri   eve   sig   获取   

Flink流处理时间方式

  • EventTime

    时间发生的时间,例如:点击网站上的某个链接的时间

  • IngestionTime

    某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据

  • ProcessingTime

    某个Flink节点执行某个operation的时间,例如:timeWindow接收到数据的时间

    技术图片

? ?

设置Flink流处理的时间类型

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

问题

技术图片

1. 使用时间窗口来统计10分钟内的用户流量

2. 有一个时间窗口

  • 开始时间为:2017-03-19 10:00:00
  • 结束时间为:2017-03-19 10:10:00

3. 有一个数据,因为网络延迟

  • 事件发生的时间为:2017-03-19 10: 10 :00
  • 但进入到窗口的时间为:2017-03-19 10:10: 02 ,延迟了2秒中

4. 时间窗口并没有将 59 这个数据计算进来,导致数据统计不正确

这种处理方式,根据消息进入到window时间,来进行计算。在网络有延迟的时候,会引起计算误差。

? ?

水印(watermark)

水印就是一个时间戳,可以给每个消息添加一个 允许一定延迟 的时间戳

  • 窗口可以继续计算一定时间范围内延迟的消息
  • 添加水印后,窗口会等 5 秒,再执行计算。若超过5秒,则舍弃。
  • 窗口执行计算时间由 水印时间 来触发,当接收到消息的 watermark >= endtime ,触发计算

    ? ?

    技术图片

? ?

Flink提供添加水印的API

????????val watermarkData: DataStream[Message] =

????????clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]

????????{

???????????var currentTimestamp: Long = 0L

???????????val maxDelayTime = 5000L

???????????var watermark: Watermark = null

????????// 获取当前的水印

???????????override def getCurrentWatermark = {

????????????watermark = new Watermark(currentTimestamp - maxDelayTime)

????????????watermark

??????????}

???????????// 时间戳抽取操作

???????????override def extractTimestamp(t: Message, l: Long) = {

????????????val timeStamp = t.timestamp

????????????currentTimestamp = Math.max(timeStamp, currentTimestamp)

????????????currentTimestamp

??????????}

?????????})

? ?

? ?

Flink水印机制(watermark)

标签:link   结束   add   有一个   size   rri   eve   sig   获取   

原文地址:https://www.cnblogs.com/starzy/p/11439997.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!