标签:
window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing
下。
一个Window
代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。
Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:
public abstract long maxTimestamp();
用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现Window
时,子类应该override equals
和hashCode
这两个方法,以使得在逻辑上两个相等的window被认为是同一个。
GlobalWindow
是一个全局窗口,被实现为单例模式。其maxTimestamp
被设置为Long.MAX_VALUE
。
该类内部有一个静态类定义了GlobalWindow
的序列化器:Serializer
。
TimeWindow
表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:
TimeWindow
表示的时间间隔为[start, end)。其maxTimestamp
的实现为:
public long maxTimestamp() {
return end - 1;
}
其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较start
,end
这两个属性。
TimeWindow
也在内部实现了序列化器,该序列化器主要针对start
和end
两个属性。
元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:
timestamp
的元素element
分配给一个或多个窗口,并返回窗口集合WindowAssigner
关联的默认触发器WindowAssigner
分配的窗口的序列化器整个类型继承图如下:
下面会谈到很多基于时间的窗口,这里有两个概念,分别是时间类型
和窗口类型
:
时间类型:
窗口类型:
assignWindows
方法中返回的一般都是Collections.singletonList()
)该分配器对应于窗口GlobalWindow
,它将所有的元素分配给同一个GlobalWindow
(本质上而言,GlobalWindow
也只有一个实例)。跟GlobalWindow
的实现方式一样,GlobalWindows
也被实现为单例模式。
方法实现:
GlobalWindow
单实例的集合对象NerverTrigger
依据给定的窗口大小,结合event-time,返回存储TimeWindow
单实例的集合。getDefaultTrigger
方法返回EventTimeTrigger
类型的实例。
依据给定窗口的大小,结合processing-time,返回存储TimeWindow
单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。getDefaultTrigger
方法返回的是ProcessingTimeTrigger
类型的实例。
Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动
值,即slide
。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。
在Sliding窗口中,assignWindows
方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:size/slide
,然后循环初始化给定的size
内不同slide
的窗口对象。
类似SlidingProcessingTimeWindows
只不过窗口的start
参数的计算方式依赖于系统时间戳。
evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素
它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前
该接口只定义了一个方法:
int evict(Iterable<StreamRecord<T>> elements, int size, W window);
接口的返回值即表示要剔除元素的个数。
Flink内置实现了三个Evitor
:
这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
break;
}
toEvict++;
}
return toEvict;
}
大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。
然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。
基于容量的Evictor,它通过比对evict
方法的第二个参数size
来判断应该剔除多少个元素。具体的实现:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
if (size > maxCount) {
return (int) (size - maxCount);
} else {
return 0;
}
}
基于给定的阈值threshold
和deltaFunction
来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。
Flink中仅有一个类Time
来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个Time
对象,需要两个参数:
TimeUnit
的实例,表示时间间隔的单位该类提供的很多静态方法提供对不同unit的设置。
Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。
以粗粒度来看,Flink主要提供了三种形式的触发方式:
这体现为Trigger的三个主要的抽象方法:
CountTrigger
以上这些方法中都有一个共同的参数:TriggerContext
。
顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger
的内部接口:
watermark
onProcessingTime
onEventTime
其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法getKeyValueState
也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。
Trigger
中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过TriggerResult
来表达的。它是一个枚举类型,有这么几个枚举值:
FIRE_AND_PURGE
:同时具备FIRE
和PURGE
两种属性产生的行为Flink内置实现了很多触发器,完整的类图如下:
这些触发器都具有一些共性,这里一并说明:
Trigger
中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回TriggerResult.CONTINUE
(其实个人认为这种设计方式有欠妥当,因为不利于扩展)TriggerContext
的getPartitionedState
方法进行存取基于事件时间的触发器,对应onEventTime
基于当前系统时间的触发器,对应onProcessingTime
该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark
。首次触发的判断位于onElement
中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first
状态标识为false。具体实现如下:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
if (first.value()) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
first.update(false);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
持续的触发依赖于在onEventTime
中不断注册下一次触发的定时器:
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp
来判断是否需要触发,不过它的循环注册过程是在onElement
中。
基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement
中,逻辑很简单,先累加如果大于给定的阈值则触发:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult
的实例,进行相应的判断,最后返回FIRE_AND_PURGE
枚举值。
基于DeltaFunction
和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在onElement
中。
本篇还是侧重于分析跟窗口有关的概念,就目前来看它们并没有太多的关联性,这一点我们在后续会剖析它们如何关联起来实现完整的窗口机制的。
微信扫码关注公众号:Apache_Flink
Apache Flink源码解析之stream-window
标签:
原文地址:http://blog.csdn.net/yanghua_kobe/article/details/51367641