标签:default func 适用于 推理 设置 运算 res 网络 关注
跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数:
完全展开之后共有五种,这也符合ANSI-standard SQL对连接种类的划分。
下文当我们提及“join”时,主要指equi-join,而当我们想表达outer-join时,我们会直接使用“外连接”,当我们想泛指时,我们将使用“连接”这个词。
常用来实现连接的算法有:hash join、sort-merge join以及nested loop join,下面我们对这三种算法进行简单介绍。首先,当基于hash算法实现连接时,通常划分为两个阶段:
而使用sort-merge算法实现连接时,通常也划分为两个阶段:
nested loop实现连接相对更容易理解,它使用两层嵌套循环分别作用于两个参与连接的数据集。
在Flink的DataSet API中,hash和sort-merge算法都可被选择用于实现join和outerJoin,而nested loop只用于实现cross join。
通过上面的介绍,我们得知当选择hash算法来实现连接时,需要确定以哪个输入端作为build端,哪个输入端作为probe端,这是影响其执行效率的因素之一(因为通常选择数据量较小的数据集作为build端)。因此,以hash算法来实现连接时,而不同的选择显然对应着不同的运算符描述器,列举如下:
而当以sort-merge算法来实现连接时,不会区分输入端的特殊职责,也就不存在build阶段和probe阶段,因此运算符描述器只有如下四种:
以上这么多运算符描述器,主要是为它们设置不同的执行策略(DriverStrategy),不同的执行策略直接导致了不同的执行成本。
为了理清算法跟参与连接的输入端的关系,Flink将它们区分成两种不同策略的:本地策略以及传输(ship)策略。其中传输策略表示如何移动两个输入端中的数据使得它们具备连接的条件;本地策略则指两个已在本地的输入端数据集所执行的连接算法。
我们来解释一下这两种策略,假设有两个待连接的数据集(R和S)。传输策略有如下两种:
正如上面已经提及的,本地策略也即连接的实现算法也有两种:
在不指定“Hint”的情况下,Flink在进行批处理优化时会根据成本自动选择传输策略以及本地策略。优化器的一个关键特征是它会根据已经存在的数据属性来进行推理。就连接运算而言,如果某一个输入端的数据量远小于另一输入端,Flink会倾向于选择BF传输策略,将较小的输入端广播给较大的输入端的每一个分区,并在本地策略中选择HH且以较小的输入端作为HH的构建端;如果优化器得知某个(或两个)输入端已排好序,那么生成的候选计划将不再重分区该输入端,此时它更倾向于选择RR传输策略以及SM本地策略。
除了优化器的自动选择,当用户对数据集非常了解的情况下,Flink定义了JoinHint允许用户为join(inner join)指定连接策略给予优化器提示。JoinHint提供了人为选择连接策略的灵活性,其使用方式有两种,一种是直接指定两个输入端的大小:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 = input1.joinWithTiny(input2) //提示优化器第二个数据集比第一个数据集小得多
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 = input1.joinWithHuge(input2) //提示优化器第二个数据集比第一个数据集大得多
.where(0)
.equalTo(0);
另一种是直接指定连接策略:
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
当前有如下的这些策略可供选择:
对应到优化器中,JoinHint被用来指定创建何种运算符描述器,由于JoinHint只适应于join,所以它只对应如下这些运算符描述器:
因此,如果用户给出了JoinHint,则数据属性(其实这里主要是DriverStrategy)会通过以上三种运算符描述器来提供:
joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
switch (joinHint) {
case BROADCAST_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
break;
case BROADCAST_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_SORT_MERGE:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
break;
case OPTIMIZER_CHOOSES:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
break;
default:
throw new CompilerException("Unrecognized join hint: " + joinHint);
}
由代码段可见,当将选择权交给优化器时,它会将三种运算符描述器都作为数据属性,供后续生成候选计划时再剔除。
除了针对join的提示外,Flink还提供了针对求交叉连接的提示CrossHint,该提示主要是针对输入端的数据量大小。使用示例如下:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple4<Integer, String, Integer, String>>
udfResult = input1.crossWithTiny(input2) //提示第二个数据集非常小
// apply any Cross function (or projection)
.with(new MyCrosser());
DataSet<Tuple3<Integer, Integer, String>>
projectResult = input1.crossWithHuge(input2) //提示第二个数据集非常大
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1);
不同于Join提示,Cross提示被表述为不同的API。从代码层面上来看,CrossHint有三个枚举值:
在创建相关运算符描述器CrossHint被用来指定特定的构造参数,比如是允许第一个输入端广播还是第二个输入端广播。交叉连接的实现算法为nested-loop,关于运算符描述器,考虑到以哪个数据集作为内、外层循环以及以阻塞模型还是流模型来处理这两个因素,有四种实现:
且需要注意的是,不同的处理模型,哪个输入端作为内外循环是相反的:
else if (hint == CrossHint.SECOND_IS_SMALL) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterSecondDescriptor(false, true));
list.add(new CrossStreamOuterFirstDescriptor(false, true));
this.dataProperties = list;
}
else if (hint == CrossHint.FIRST_IS_SMALL) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterFirstDescriptor(true, false));
list.add(new CrossStreamOuterSecondDescriptor(true, false));
this.dataProperties = list;
}
但广播哪个输入端是一致的。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
标签:default func 适用于 推理 设置 运算 res 网络 关注
原文地址:http://blog.csdn.net/yanghua_kobe/article/details/70666307