码迷,mamicode.com
首页 > 其他好文 > 详细

Flink kuduSink开发

时间:2020-05-26 20:41:12      阅读:275      评论:0      收藏:0      [点我收藏+]

标签:json   new   nta   方法   stat   kudu   未定义   hsi   put   

1、继承RichSinkFunction

(1)首先在构造方式传入kudu的masterAddress地址、默认表名、TableSerializationSchema、KuduTableRowConverter、Properties配置对象

(2)重写open方法

初始化KuduClient对象操作kudu,KuduSession对象并传入一堆配置

(3)重写invoke方法

核心是如果已传入TableSerializationSchema对象,则通过其serializeTable方法从输入的json数据里提取表名,如果未定义则直接取默认表名。拿到表名后就能使用KuduClient对象对其操作了

if (schema != null) {
String serializeTableName = schema.serializeTable(row);
if (serializeTableName == null) return;
table = client.openTable(serializeTableName);
}
else
table = client.openTable(tableName);
insert = table.newInsert();

2、定义KuduTableRowConverter接口,将每一条输入数据转换成TableRow对象

public interface KuduTableRowConverter<IN> extends Serializable {
TableRow convert(IN value);
}

定义TableRow类,代表一行数据,key是字串型的键名,value是Object型的键值

public class TableRow implements Serializable {
private static final long serialVersionUID = 1L;
private Map<String, Object> pairs = new HashMap<>();
public int size() {return pairs.size();}
public Map<String, Object> getPairs() {return pairs;}
public Object getElement(String key) {return pairs.get(key);}
public void putElement(String key, Object value) {pairs.put(key, value);}
}

定义JsonKuduTableRowConverter实现KuduTableRowConverter接口,对于输入的json数据,通过一系列转换逻辑转换成TableRow对象

3、定义TableSerializationSchema接口,从每一条输入数据里提取表名

public interface TableSerializationSchema<IN> extends Serializable {
String serializeTable(IN value);
}

定义JsonLogidKeyTableSerializationSchema实现TableSerializationSchema接口,对于输入的json数据,使用指定key值提取value值,然后再从一个预先获取的map里找到这个value对应的表名,然后加上必要的前缀与后缀组成impala的表名

Flink kuduSink开发

标签:json   new   nta   方法   stat   kudu   未定义   hsi   put   

原文地址:https://www.cnblogs.com/codetouse/p/12968238.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!