标签:组成 相互转换 map 兼容 format 无法 alt extractor 注册
Flink在内部以二进制的格式将数据保存,由于普通的Java对象类型和内部二进制格式不一致,需要一套相互转换机制来进行序列化和反序列化。
Flink支持的物理类型如下图所示:
分为基础类型、数组类型、复合类型、辅助类型、泛型和其他类型,如果用户需要自定义类型的话,需要注册该类型并自己实现序列化和反序列化的方法。
对于没有提供的自定义类型,Flink为了程序正常运行,会交给Kryo进行序列化,缺点是序列化和反序列化效率较低。
逻辑类型是物理类型的描述,Flink运行时会根据逻辑类型进行数据的序列化和反序列化。
在Flink中使用TypeInformation来描述逻辑类型,该类是一个抽象类,所有逻辑类型继承该类,分类如下图所示:
在序列化过程中,所有逻辑类型都必须实现createSerializer(ExecutionConfig config)方法来创建序列化器。
开发者使用物理类型,Flink运行时使用逻辑类型,所以需要从物理类型转换为逻辑类型,Java使用反射机制获取Function的输入输出。
在使用DataStream接口的时候,会触发类型的提取,如下面map()方法所示:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 提取类型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return map(mapper, outType);
}
Flink会首先进行类型推断,如果用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,来获取泛型的类型信息,如下:
TypeInformation.of(new TypeHint<Tuple2<Integer,String>>(){})
一般情况下,可以通过TypeInformation.of()方法来显式创建一个类型信息的对象,如下:
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
由于泛型的类型擦除导致类型提取不能总是有效,所以有时候需要自己手动指定。
使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而不要使用com.esotericsoftware.kryo.serializers.JavaSerializer,防止和Flink不兼容。
Flink1.9之后引入了新的SQL类型系统,解决了DataStream在SQL中的兼容性、精度、类型等问题。
Flink SQL中使用DataType中的LogicalType类型系统来描述类型信息,在Flink SQL执行时,最终还是要转换为TypeInformation。
Row表示表中的一行数据或者一条记录,在1.9版本之前,Flink SQL使用org.apache.flink.types.Row,在1.9版本之后,使用org.apache.flink.table.dataformat.BaseRow及其子类,下面主要介绍Blink Row。
Blink Row分为列式存储和行式存储,结构如下所示:
区别如下:
标签:组成 相互转换 map 兼容 format 无法 alt extractor 注册
原文地址:https://www.cnblogs.com/jordan95225/p/13882170.html