标签:两种 art 版本 imp 原理 use nal default class
Spark1.6只能创建临时UDF,不支持创建持久化的UDF。
从Spark-2.0开始,SparkSQL支持持久化的UDF,目前看来是支持UDAF
(1)自定义UDF类,实现UDF1/2/3....22中的接口之一,其中UDF后跟的数字,
比如UDF1、UDF2;表示输入参数的个数,1表示有一个入参,2表示有两个入参,
最多可传入22个输入参数
实现 call()方法
两种方式 : 过匿名函数 和 通过实名函数
(2)注册UDF函数: SparkSQL UDF 两种方式:udf() 和 register()
01.
Spark1.x: sqlContext.udf.register
Spark2.x: spark.udf.register
org.apache.spark.sql SparkSession
def udf: UDFRegistration = sessionState.udfRegistration
02. DataFrame的udf方法在 org.apache.spark.sql.functions 里
org.apache.spark.sql
functions.{col, lit, udf}
spark.sql.function.udf() 方法 此时注册的方法,对外部可见
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { UserDefinedFunction(f, dataType, None)}
(3) 目前使用UDAF可以在SQL中,而Spark UDF使用,在Spark 任务中可以执行,在SparkSQL任务中执行报错
import org.apache.spark.sql.api.java.UDF1;
public class StringLenUDF implements UDF1<String , Integer> {
@Override
public Integer call(String value) throws Exception {
Integer data = value.length();
return data;
}
}
(3)在Spark中使用
spark.udf().register("trans_len",new StringLenUDF(),DataTypes.IntegerType);
String sqlTex = "select date,trans_len(date ) mem_distinct_cnt " +
"from mem_data_table ";
spark.sql(sqlTex).show();
(3) SQL 中使用UDF
打包:
ADD jar sparkudf.jar;
CREATE TEMPORARY FUNCTION trans_len AS ‘com.test.txt.udf.StringLenUDF‘;
select t1.data,trans_len(t1.data) as uid_bitmap_byte
from (
select 100 as user_id, ‘2020‘ as data
union all
select 200 as user_id, ‘2019‘ as data) t1
注释:
情景: Spark SQL 在SQL中使用 Spark的UDF报错
报错: Error in query: No handler for UDF/UDAF/UDTF ‘com.test.txt.udf.LenCountUDF‘; line 3 pos 15
原因:
SessionCatalog calls registerFunction to add a function to function registry.
However, makeFunctionExpression supports only UserDefinedAggregateFunction.
参考:
Default SessionCatalog should support UDFs https://issues.apache.org/jira/browse/SPARK-25334
Hive中Binary类型于0.8版本以上开始支持。
https://stackoverflow.com/questions/52164488/spark-hive-udf-no-handler-for-udaf-analysis-exception
报错的 org.apache.spark.sql.hive.HiveSessionCatalog
// Construct a [[FunctionBuilder]] based on the provided class that represents a function.
private def makeFunctionBuilder
throw new AnalysisException(s"No handler for Hive UDF ‘${clazz.getCanonicalName}‘")
org.apache.spark.sql.catalyst.catalog.SessionCatalog
SessionCatalog calls registerFunction to add a function to function registry.
//Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
def registerFunction(
//Drop a temporary function.
def dropTempFunction
介绍:
Spark SQL 中的 Catalog 体系实现以 SessionCatalog 为主体,通过 SparkSession (Spark 程序入口)提供给外部调用。
一般一个 SparkSession 对应一个 SessionCatalog。
本质上, Session Catalog 起到了一个代理的作用,对底层的元数据信息、临时表信息、视图信息和函数 信息进行了封装。
HiveSessionCatalog 继承了Spark的默认SessionCatalo
/*** A catalog for looking up user defined functions, used by an [[Analyzer]].*/
trait FunctionRegistry {
SparkSession解析SessionCatalog、SharedState和SessionState https://blog.csdn.net/qq_41775852/article/details/105157879
标签:两种 art 版本 imp 原理 use nal default class
原文地址:https://www.cnblogs.com/ytwang/p/14023589.html