标签:
窗口(Window)
本文翻译自文档Windows
-----------------------------------
Flink使用窗口的概念,根据element的时间戳或者其他指标,将可能无限的DataStream分割为有限的数据切片(slice)。我们在处理无限数据流以及进行聚合element的transformation时需要此种窗口分割。
注意:我们在此文档中讨论的大多是keyed windowing,即window是应用在KeyedStream上的。关键字下的窗口具有一定的优势,即它可以在element传递给user function之前就能按照window和关键字共同二次分割element。由于不同关键字的element可以相互独立处理,所以该工作可以在cluster之上分布式进行。有关non-keyed window的信息,请查看文档non-keyed windowing
一个带窗口的transformation至少需要定义一个key(见文档specifying keys)、一个window assigner以及一个window function。key将无限而无关键字的流分割成逻辑的有关键字数据流,而window assigner将element赋值给有限的各自关键字的窗口(per-key window)。最后window function会用于处理每个窗口的element。
带窗口transformation的基础结构如下所示:
DataStream<T> input = ...;
input.keyBy(<key selector>)
.window(<window
assigner>)
.<windowed transformation>(<window
function>);
我们会在接下来的一节中单独讲window assigners。
Window transformation可以是reduce(),fold()或者apply()之一,它们相对应的需要一个ReduceFunction,FoldFunction或WindowFunction。我们将会在下文window functions中具体描述定义一个带窗口transformation的不同方式。
在更进一步的用例中,你可以定义一个Trigger来决定一个窗口什么时候才是ready for processing的。相关详细内容见于本文triggers小节。
Window assigner定义了数据流的element将如何分割进有限的数据切片。Flink自带预先实现了针对多数典型用例的window assigner,以tumbling window,sliding window,session window和global window命名,此外,你还可以通过继承WindowAssigner类来自定义自己的window assigner。除了global window,所有自带window assigner都是基于时间(可以是processing time或者event time)来分配element。有关Flink如何处理时间,请见文档event time。
在描述这些window assigner如何用于Flink程序之前,我们先描述它们的工作机制。我们将使用抽象图来可视化每个assigner的工作机制:在下面的内容中,紫色圈是数据流的element,它们以不同的关键字进行分割(在该例中关键字为user1, user2, user3),x轴表示时间的进展。
Global Window的定义表明我们不会进一步将element二次分割到窗口中。每个element将被分配到一个单独的各自关键字的窗口中。该窗口化模式仅仅在同时拥有一个自定义trigger时才有用。否则由于global window没有用以聚合element的常态结束,所以不会发生任何计算。
Tumbling window assigner将element分配到一个固定长度、无重叠的窗口,且该窗口的window size有用户定义。例如,如果你定义window size为5分钟,window function每次调用都会得到5分钟的element。
Sliding window assigner和tumbling window一样,将element分配到一个固定长度(等于window size),但该窗口可重叠,重叠的大小由用户定义的参数window slide定义。由于窗口可重叠,故一个element可以分配到多个窗口中去。
例如,你可以定义一个window size为10分钟,且slide为5分钟。在该窗口中,每此window function会得到10分钟的element,且5分钟调用一次。
Session window assigner在窗口边界需要根据到达数据调整的情况下十分适用。Tumbling windows和sliding windows的assigner都将element分配到开始于固定时间点并且拥有固定window size的窗口中。而在session中,你可以让关键字窗口开始于它们自己的时间点,并且在一段无活动情况(inactivity)出现时结束窗口。该窗口的配置参数session gap定义了等待新数据多长时间就结束一个session。
除了GlobalWindows,内置window assigner都有两个版本,一个处理processing-time windowing,另一个处理event-time windowing。Processing-time assigner根据worker设备的当前时钟来分配element,而event-time assigner根据element的时间戳来分配窗口。有关processing time和event time的区别以及如何给element分配时间戳的内容,请见文档event time。
下面的代码片段展示了在程序中如何使用每个window assigner:
DataStream<T> input = ...;
//
tumbling event-time windows
input
.keyBy(<key
selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window
function>);
//
sliding event-time windows
input
.keyBy(<key
selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
.<windowed transformation>(<window
function>);
//
event-time session windows
input
.keyBy(<key
selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window
function>);
//
tumbling processing-time windows
input
.keyBy(<key
selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window
function>);
//
sliding processing-time windows
input
.keyBy(<key
selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
.<windowed transformation>(<window
function>);
//
processing-time session windows
input
.keyBy(<key
selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window
function>);
//
global windows
input
.keyBy(<key
selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window
function>);
注意:我们可以通过Time.millisecond(x),Time.second(x),Time.minutes(x)等等方法来定义时间。
在系统确定一个窗口已经做好了处理的准备时(有关系统是如何确定窗口可处理,见文档trigger),Flink将使用Window Function处理每个窗口中的element。
Window Function可以是ReduceFunction、FlodFunction或WindowFunction。由于Flink在element到达各自窗口时递增地聚合它们,所以前两个方法可以更加高效地执行。WindowFunction获取窗口中所有element的Iterable以及这些element所属的窗口的额外元信息(meta information)。
由于Flink在调用使用了WindowFunction的窗口化transformation之前,必须在内部缓存一个窗口中所有element,所以此种transformation无法像其他情况一样高效执行。我们可以将WindowFunction和一个ReduceFunction或FoldFunction相结合,从而在递增聚合窗口中element的同时,还可以获取WindowFunction接收的额外信息,通过这种方式,我们可以缓解上述问题。我们在下面会对各种情况一一举例。
一个reduce方法定义了两个值如何结合形成一个element。Flink可以使用它来递增地聚合窗口中的element。
程序中的ReduceFunction如下例:
DataStream<Tuple2<String, Long>> input = ...;
input.keyBy(<key
selector>)
.window(<window
assigner>)
.reduce(new
ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String,
Long> reduce(Tuple2<String,
Long> v1, Tuple2<String, Long> v2)
{
return new Tuple2<>(v1.f0,
v1.f1 + v2.f1);
}
});
一个ReduceFunction定义了两个输入中的element是如何结合产生一个输出element的。在上面的例子中,将会计算出一个窗口中所有element的第二个域的总和。
一个fold方法可以定义如下:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.fold("",
new FoldFunction<Tuple2<String, Long>,
String>> {
public String fold(String
acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
一个FoldFunction定义了输入中的element如何加上一个累加初始值(在本例中为"",即空字符串)。在上例中,将会计算出输入中所有Long域的字符串连接(concatenation)结果。
WindowFunction以性能开销的增加,换来了最大的灵活性(它可以获得key和Window的引用)。WindowFunction由于无法递增聚合窗口中的element,在窗口准备好处理之前,Flink都必须内部缓存整个窗口,所以带来性能上的开销。一个WindowFunction将得到处理的窗口中所有element的Iterable。WindowFunction接口的签名如下所示:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window
and outputs none or several elements.
*
* @param key The key
for which this window is evaluated.
* @param window The
window that is being evaluated.
* @param input The
elements in the window being evaluated.
* @param out A
collector for emitting elements.
*
* @throws Exception The
function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY
key, W window, Iterable<IN> input,
Collector<OUT> out) throws Exception;
}
下面我们举例使用WindowFunction来计算一个窗口中element数量。我们之所以选择WindowFunction,是因为我们想在发送计数的同时,还想访问并一同发送有关窗口的信息。这将非常低效,我们应当在练习中和一个ReduceFunction一同实现WindowFunction。在下一节中,我们将会看到将ReduceFunction和WindowFunction结合来获取递增地聚合以及之前添加的WindowFunction的信息
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.apply(new
MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void
apply(String
key, TimeWindow window, Iterable<Tuple<String,
Long>> input, Collector<String> out)
{
long
count = 0;
for (Tuple<String,
Long> in: input) {
count++;
}
out.collect("Window:
" + window + "count:
" + count);
}
}
一个WindowFunction可以与ReduceFunction或者FoldFunction结合,在结合后,ReduceFunction/FoldFunction将会用来在窗口的element到达时递增聚合它们,而WindowFunction则会在窗口准备好处理时得到已经聚合后的结果。这种方式使我们可以即获得窗口递增计算的优势,又可以获得编写一个WindowFunction提供的额外窗口元信息。
下面的例子为我们展示了递增聚合方法如何与WindowFunction结合:
DataStream<Tuple2<String, Long>> input = ...;
//
for folding incremental computation
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.apply(<initial
value>, new MyFoldFunction(),
new MyWindowFunction());
//
for reducing incremental computation
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.apply(new
MyReduceFunction(), new MyWindowFunction());
在处理事件时间窗口时,可能会发生element迟到的情况,即Flink用来持续跟踪事件时间进展的watermark已经晚于到达element所属的窗口的结束时间戳了的情况。有关event time和其中迟到element等等Flink如何处理event time的详细讨论,请见event time和late element。
你可以定义一个带窗口transformation如何处理迟到element以及允许的迟到时间(lateness)。相关的参数为allowed lateness,该参数定义了element最多可以迟到多长时间。对于在allowed lateness之内到达的element,Flink仍然会将它们放入窗口中并且考虑到计算结果之内。而在allowed lateness之外到达的element则将被抛弃。Flink同样保证一旦Watermark超过窗口结束时间加上allowed lateness,由窗口Operation持有的所有状态都将进入垃圾回收。
默认地,allowed lateness设置为0,即在watermark之后到达的element将会被抛弃。你可以通过以下方式定义allow lateness:
DataStream<T> input = ...;
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window
function>);
注意,当使用GlobalWindows的window assigner时,没有数据会变为迟到数据,因为全局窗口的结束时间戳为Long.MAX_VALUE
一个Trigger决定着窗口(由WindowAssigner赋值)什么时候准备好由window function处理。trigger观察element如何加入窗口中,并且持续跟踪processing time和event time的进展。一旦一个trigger决定窗口已准备好处理,它就会被触发。这是Window Operation获取当前处于窗口中的数据,并且将它们传递给window function来产生处于触发状态的(firing)窗口的输出的信号。
除了GlobalWindows,每个WindowAssigner都带有一个默认trigger来适用于绝大多数用例。例如,TumblingEventTimeWindows拥有EventTimeTrigger作为默认trigger,该trigger简单地在watermark超过窗口的结束时间时触发。
你可以通过使用给定Trigger类来调用trigger()方法定义所用的trigger。一个带窗口transformation的全貌大致如下:
DataStream<T> input = ...;
input
.keyBy(<key
selector>)
.window(<window
assigner>)
.trigger(<trigger>)
.<windowed transformation>(<window
function>);
Flink自带一些开箱即用的trigger:包括上面提到的EventTimeTrigger,基于由watermark衡量的event time的进展来决定是否触发;还有ProcessingTimeTrigger,与EventTimeTrigger大致一样,但基于processing time;最后是CountTrigger,在一个窗口的element数量溢出给定界限时触发。
注意,通过使用trigger()方法定义一个trigger,你将重写WindowAssigner的默认trigger。例如,若你为TunmblingEventTimeWindows定义CountTrigger为trigger,窗口将不会基于时间进展触发,而仅仅依靠计数结果来触发。在当前版本下,若你想要同时对时间和计数都做出响应,你只能自定义trigger。
内部Trigger API在当前版本仍然处于测试阶段,但如果你想要编写自定义trigger,请检出(check out)该代码。Trigger.java
你同样可以在定义一个带窗口transformation时忽略KeyBy()方法,该方式将使Flink无法并行处理各自不同key的窗口,本质上将transformation变成了一个非并行的Operation。
警告:正如本文开始介绍中提到的,由于non-keyed windows不能分各自key独立计算,所以有着无法在集群上分布式运行的缺陷,这将带来一些性能上的影响。
一个带non-keyed window的transformation的基本结构如下代码所示:
DataStream<T> input = ...;
input
.windowAll(<window
assigner>)
.<windowed transformation>(<window
function>);
Flink Program Guide (6) -- 窗口 (DataStream API编程指导 -- For Java)
标签:
原文地址:http://www.cnblogs.com/lanyun0520/p/5774831.html