标签:this 1.0 eve apply can new fun hid ast
转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html
1 WindowFunction类型不匹配无法编译。
flink 版本:1.3.0
参考https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation写的demo发现reduce加入MyWindowFunction后编译不通过,报错参数类型不匹配。
代码如下
MyReduceFunction a = new MyReduceFunction(); DataStream<Tuple3<String, String, Integer>> counts4 = source .keyBy(0) .window(TumblingEventTimeWindows.of(Time .of(1, TimeUnit.SECONDS))) .reduce(a, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void apply( String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, String, Integer>> out) throws Exception { for (Tuple2<String, Integer> in : values) { out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); } } }); public static class MyReduceFunction implements ReduceFunction<Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(value1.f0, (value1.f1 + value2.f1)); } }
上述代码在reduce函数加入WindowFunction后代码一直报错,显示reduce函数包含的参数类型不匹配。其实原因出在keyBy(0)这个用法上,DataStream在调用public KeyedStream<T, Tuple> keyBy(int... fields) 和public KeyedStream<T, Tuple> keyBy(String... fields) 这两个方法的时候会调用
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
其中,KeySelectorUtil.getSelectorForKeys返回的是一个ComparableKeySelector类型的KeySelector,而这个类的定义为
public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple>
根据KeySelector的定义可知,ComparableKeySelector输出的所有key类型都为Tuple,所以上述WindowFunction设置的第三个泛型参数String是不对的。
解决办法
1 自定义KeySelector
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } } DataStream<Tuple3<String, String, Integer>> counts4 = source .keyBy(new TupleKeySelector())
上面首先定义了一个TupleKeySelector,返回Key类型为String,然后keyby的参数设置为对应的new TupleKeySelector(),表示keyStream根据一个String类型的Key分区
2 WindowFunction第三个Key泛型由String改为Tuple
标签:this 1.0 eve apply can new fun hid ast
原文地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html