码迷,mamicode.com
首页 > 其他好文 > 详细

flink 问题记录

时间:2017-10-11 20:33:02      阅读:269      评论:0      收藏:0      [点我收藏+]

标签:this   1.0   eve   apply   can   new   fun   hid   ast   

 转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html

1 WindowFunction类型不匹配无法编译。

flink 版本:1.3.0

参考https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation写的demo发现reduce加入MyWindowFunction后编译不通过,报错参数类型不匹配。

代码如下

技术分享
        MyReduceFunction a = new MyReduceFunction();
        
        

        DataStream<Tuple3<String, String, Integer>> counts4 = source

                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time
                        .of(1, TimeUnit.SECONDS)))
                .reduce(a,
                        new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public void apply(
                                    String key,
                                    TimeWindow window,
                                    Iterable<Tuple2<String, Integer>> values,
                                    Collector<Tuple3<String, String, Integer>> out)
                                    throws Exception {
                                for (Tuple2<String, Integer> in : values) {
                                    out.collect(new Tuple3<>(in.f0, in.f0,
                                            in.f1));
                                }
                            }
                        });    



public static class MyReduceFunction implements
            ReduceFunction<Tuple2<String, Integer>> {

        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
                Tuple2<String, Integer> value2) throws Exception {
            // TODO Auto-generated method stub
            return new Tuple2<String, Integer>(value1.f0,
                    (value1.f1 + value2.f1));
        }

    }
View Code

  上述代码在reduce函数加入WindowFunction后代码一直报错,显示reduce函数包含的参数类型不匹配。其实原因出在keyBy(0)这个用法上,DataStream在调用public KeyedStream<T, Tuple> keyBy(int... fields) 和public KeyedStream<T, Tuple> keyBy(String... fields) 这两个方法的时候会调用

private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}

其中,KeySelectorUtil.getSelectorForKeys返回的是一个ComparableKeySelector类型的KeySelector,而这个类的定义为

   public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> 

根据KeySelector的定义可知,ComparableKeySelector输出的所有key类型都为Tuple,所以上述WindowFunction设置的第三个泛型参数String是不对的。

 

解决办法

1 自定义KeySelector

技术分享
    private static class TupleKeySelector implements
            KeySelector<Tuple2<String, Integer>, String> {

        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return value.f0;
        }
    }


        DataStream<Tuple3<String, String, Integer>> counts4 = source

                .keyBy(new TupleKeySelector())
View Code

上面首先定义了一个TupleKeySelector,返回Key类型为String,然后keyby的参数设置为对应的new TupleKeySelector(),表示keyStream根据一个String类型的Key分区

2  WindowFunction第三个Key泛型由String改为Tuple

 

参考问题:https://stackoverflow.com/questions/36917586/cant-apply-custom-functions-to-a-windowedstream-on-flink

 

flink 问题记录

标签:this   1.0   eve   apply   can   new   fun   hid   ast   

原文地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!