标签:link 结束 add 有一个 size rri eve sig 获取
Flink流处理时间方式
时间发生的时间,例如:点击网站上的某个链接的时间
某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据
某个Flink节点执行某个operation的时间,例如:timeWindow接收到数据的时间
? ?
设置Flink流处理的时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
问题
1. 使用时间窗口来统计10分钟内的用户流量
2. 有一个时间窗口
3. 有一个数据,因为网络延迟
4. 时间窗口并没有将 59 这个数据计算进来,导致数据统计不正确
这种处理方式,根据消息进入到window时间,来进行计算。在网络有延迟的时候,会引起计算误差。
? ?
水印(watermark)
水印就是一个时间戳,可以给每个消息添加一个 允许一定延迟 的时间戳
? ?
? ?
Flink提供添加水印的API
????????val watermarkData: DataStream[Message] = ????????clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] ????????{ ???????????var currentTimestamp: Long = 0L ???????????val maxDelayTime = 5000L ???????????var watermark: Watermark = null ????????// 获取当前的水印 ???????????override def getCurrentWatermark = { ????????????watermark = new Watermark(currentTimestamp - maxDelayTime) ????????????watermark ??????????} ???????????// 时间戳抽取操作 ???????????override def extractTimestamp(t: Message, l: Long) = { ????????????val timeStamp = t.timestamp ????????????currentTimestamp = Math.max(timeStamp, currentTimestamp) ????????????currentTimestamp ??????????} ?????????}) |
? ?
? ?
标签:link 结束 add 有一个 size rri eve sig 获取
原文地址:https://www.cnblogs.com/starzy/p/11439997.html