标签:ports 这一 dad 还原 逻辑 简单的 可读性 operator tin
MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。 MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。
MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章
第五弹向您介绍了MaxCompute如何嵌入其他语言的脚本。SELECT TRANSFORM的优势在于可以不创建function甚至不上传资源的情况下执行其他语言的脚本,而即使需要编写资源也没有任何由MaxCompute规定的格式要求和依赖。
本文将介绍另一种将这一优势提升到更高层次的新功能:User Defined Type,简称UDT。
上述场景的问题,通过UDT能够非常好地解决,接下来将具体介绍UDT使用。
本文中很多例子采用MaxCompute Studio作展示,没有安装MaxCompute Studio的用户,可以参照文档安装MaxCompute Studio),导入测试MaxCompute项目,创建工程。
MaxCompute中的UDT(User Defined Type)功能支持在SQL中直接引用第三方语言的类或者对象,获取其数据内容或者调用其方法 。
在其他的SQL引擎中也有UDT的概念,但是和MaxCompute的概念有许多差异。很多SQL引擎中的概念比较像MaxCompute的struct复杂类型。而某些语言提供了调用第三方库的功能,如Oracle 的 CREATE TYPE。相比之下,MaxCompute的UDT更像这种CREATE TYPE的概念,Type中不仅仅包含数据域,还包含方法。而且MaxCompute做的更彻底:开发者不需要用特殊的DDL语法来定义类型的映射,而是在SQL中直接使用。
一个简单的例子如下:
上面的例子输出:
和java语言一样,java.lang这个package是可以省略的。所以上面例子更可以简写为:
可以看到,上面的例子在select列表中直接写上了类似于java表达式的表达式,而这个表达式的确就按照java的语义来执行了。这个例子表现出来的能力就是MaxCompute的UDT。
UDT所提供的所有扩展能力,实际上用UDF都可以实现。譬如上面的例子,如果使用UDF实现,需要做下列操作。
首先,定义一个UDF的类:
然后,将上面的UDF编译,并打成jar包。然后再上传jar包,并创建function
最后才可以在sql中使用
UDT相当于简化了上述一系列的过程,让开发者能够轻松简单地用其他语言扩展SQL的功能。
上述例子表现的是java静态域访问的能力,而UDT的能力远不限于此。譬如下面的例子:
上述例子输出结果 100000000000000000100。
这个例子还表现了一种用UDF比较不好实现的功能:子查询的结果允许UDT类型的列。例如上面变量a的x列是java.math.BigInteger类型,而不是内置类型。UDT类型的数据可以被带到下一个operator中再调用其他方法,甚至能参与数据shuffle。比如上面的例子,在MaxCompute studio中的执行图如下:
可以看出图中共有三个STAGE: M1, R2 和 J3。熟悉MapReduce原理的用户会知道,由于join的存在需要做数据reshuffle,所以会出现多个stage。一般情况下,不同stage不仅是在不同进程,甚至是在不同物理机器上运行的。双击代表M1的方块,显示如下:
可以看到,M1仅仅执行了 new java.math.BigInteger(x) 这个操作。而同样点开代表J3的方块,可以看到 J3 在不同的阶段执行了 java.math.BigInteger.valueOf(y) 的操作,和 x.add(y).toString() 的操作:
这几个操作不仅仅是分阶段执行的,甚至是在不同进程,不同物理机器上执行的。但是UDT把这个过程封装起来,让用户看起来和在同一个JVM中执行的效果几乎一样。
UDT同样允许用户上传自己的jar包,并且直接引用。如上面UDF的jar包。用UDT来使用:
如果觉得写 package全路径麻烦,还可以像java的import一样,用flag来指定默认的package。
提供一些提升使用效率的flag:
set odps.sql.session.resources=foo.sh,bar.txt;
注意这个flag和SELECT TRANSFORM中指定资源的flag相同,所以这个flag会同时影响SELECT TRANSFORM和UDT两个功能。*
。暂不支持static import。UDT支持的操作包括:
new Integer[] { 1, 2, 3 }
。注意:
String.valueOf(1) + String.valueOf(2)
的结果是 3 (string隐式转换为double,并且double相加),而不是‘12‘ (java中string相加是concatenate的语义)。=
操作。SQL中的 =
不是赋值 而是判断相等。而对于java对象来说,判断相等应该用equals方法,通过等号判断的相等无法保证其行为(在UDT场景下,同一对象的概念是不能保证的,具体原因参考下述第8点)。内置类型与特定java类型有一一映射关系,见UDF类型映射。这个映射在UDT也有效:
‘123‘.length() , 1L.hashCode()
。chr(Long.valueOf(‘100‘))
,其中 Long.valueOf 返回的是 java.lang.Long 类型的数据,而内置函数chr接受的数据类型是内置类型BIGINT。set odps.sql.type.system.odps2=true;
才能使用的。否则会报错。java.util.Arrays.asList(new java.math.BigInteger(‘1‘))
,编译器能够根据参数类型知道该方法的返回值是 java.util.List<java.math.BigInteger>
类型java.lang.Object
,这一点和java保持一致:new java.util.ArrayList(java.util.Arrays.asList(‘1‘, ‘2‘))
的结果是 java.util.ArrayList<Object>
类型;
而 new java.util.ArrayList<String>(java.util.Arrays.asList(‘1‘, ‘2‘))
的结果是 java.util.ArrayList<String>
类型。
UDT对 "同一对象" 的概念是模糊的。这是由数据的reshuffle导致的。从上面第一部分的join的示例可以看出,对象有可能会在不同进程,不同物理机器之间传输,在传输过程中同一个对象的两个引用后面可能分别引用了不同的对象(比如对象先被shuffle到两台机器,然后下次又shuffle回一起)。
= operator
来判断相等,而是使用 equals
方法。group by new java.math.BigInteger(‘123‘)
,但是可以 group by new java.math.BigInteger(‘123‘).hashCode()
。因为hashCode的返回值是int.class类型可以当做内置类型int来使用(应上述“内置类型与特定java类型映射”的规则)。注意:这个限制未来的版本会计划去掉。
UDT扩展了类型转换规则:
目前UDT对象不能落盘。这意味着不能将UDT对象insert到表中(实际上DDL不支持UDT,创建不出来这样的表),当然,隐式类型转换变成了内置类型的除外。同时,屏显的最终结果也不能是UDT类型,对于屏显的场景,由于所有的java类都有toString()方法,而java.lang.String
类型是合法的。所以debug的时候,可以用这种方法来观察UDT的内容。
set odps.sql.udt.display.tostring=true;
这样MaxCompute会自动把所有的以UDT为最终输出的列wrap上 java.util.Objects.toString(...)
,从而方便调试。这个flag只对屏显语句生效,对insert语句不生效,所以专门用在调试中。UDT的runtime自带一个gson的依赖(2.2.4)。因此用户可以直接使用gson
相比于get_json_object,上述用法不仅仅是使用方便了,在需要对json字符串多个部分做内容提取时,先将gson字符串反序列成格式化数据,其效率要高得多。
除了GSON, MaxCompute runtime自带的依赖还包括: commons-logging(1.1.1), commons-lang(2.5), commons-io(2.4),protobuf-java(2.4.1)。
内置类型array和map 与 java.util.List 和 java.util.Map 存在映射关系。结果就是:
还可以实现一些特殊的功能,比如 array的distinct
UDT实现聚合的原理是,先用COLLECT_SET 或 COLLECT_LIST 函数将数据转变成 List, 之后对该List应用UDT的标量方法求得这一组数据的聚合值。
如用下面的示例实现对BigInteger求中位数(由于数据是 java.math.BigInteger类型的,所以不能直接用内置的median函数)
由于collect_list会先把所有数据都收集到一块,是没有办法实现partial aggregate的,所以这个做法的效率会比内置的aggregator或者udaf低,所以 在内置aggregator能实现的情况下,应尽量使用内置的aggregator 。同时把一个group的所有数据都收集到一起的做法,会增加数据倾斜的风险。
但是另一方面,如果UDAF本身的逻辑就是要将所有数据收集到一块(比如类似wm_concat的功能),此时使用上述方法,反而可能比UDAF(注意不是内置aggregator)高。
表值函数允许输入多行多列数据,输出多行多列数据。可以按照下述原理实现:
下述示例实现将一个json字符串的内容展开出来的功能
我们知道在UDF中可以通过ExecutionContext对象来读取资源文件。现在UDT也可以通过 com.aliyun.odps.udt.UDTExecutionContext.get()
方法来或者这样的一个 ExecutionContext 对象。
下述示例将资源文件 1.txt 读取到一个string对象中,并输出:
UDT对象默认是不支持落盘的。但是有方法能够把UDT的对象持久化。基本的思想是将数据序列化成为binary或者string来做持久化,或者将udt对象展开,持久化里面的能转成内置类型的关键数据。
如下UDT定义:
将对象展开成内置类型:
需要用时再重新构造:
或者将对象serialize成binary。
平展开的最大问题是,序列化和反序列化的麻烦。当然可以直接转成binary。如改造Shape类:
如果直接利用已有的框架,也许会更方便。如 Shape 是用 ProtoBuffer 定义的
SQL中直接调用pb的方法
本功能和 MaxCompute Studio 搭配着使用,才能发挥其最大的价值。
功能方面,UDT的优势是显而易见的:
在性能方面,UDT执行过程和UDF非常接近,其性能与UDF几乎是一致的,而且产品针对UDT做了很多优化,在某些场景下UDT的性能甚至略高一筹:
values[x].add(values[y]).divide(java.math.BigInteger.valueOf(2))
这个看似存在多次UDT方法调用的操作,实际上只有一次调用。所以虽然UDT操作的单元都比较小,但是并不会因此造成多次函数调用的接口上的额外开销。在安全控制方面,UDT和UDF完全一样。即都会受到沙箱policy的限制。所以如果要使用受限的操作,需要打开沙箱隔离,或者申请沙箱白名单。
本文从使用的角度介绍了UDT的功能。UDT能够在SQL中直接写java的表达式,并可以引用jdk中的类。这一功能极大地方便扩展SQL的功能。
当然,UDT的功能还有许多功能还有待完善。文中也提到了几点有待完善的功能:
本文作者:海清
本文为阿里云内容,未经允许不得转载。
MaxCompute - ODPS重装上阵 第六弹 - User Defined Type
标签:ports 这一 dad 还原 逻辑 简单的 可读性 operator tin
原文地址:https://www.cnblogs.com/zhaowei121/p/12084196.html