标签:inf wget username write rom follow 进入 for use
Flink 1.10版本支持SQL DDL的特性,本文章以从kafka消费数据,写入jdbc为例介绍整个流程。具体操作如下:4.使用DDL建立数据源表
CREATE TABLE source_table (
id BIGINT,
name STRING,
score BIGINT
) WITH (
‘connector.type‘ = ‘kafka‘, 使用kafka connector
‘connector.version‘ = ‘universal‘, kafka版本,universal支持0.11以上的版本
‘connector.topic‘ = ‘flink-ddl-test‘, topic
‘connector.properties.zookeeper.connect‘ = ‘localhost:2181‘, zookeeper地址
‘connector.properties.bootstrap.servers‘ = ‘localhost:6667‘, broker service的地址
‘format.type‘ = ‘json‘ 数据格式
);
create table sink_table(
id BIGINT,
name String,
score BIGINT
) WITH (
‘connector.type‘ = ‘jdbc‘, 使用jdbc connector
‘connector.url‘ = ‘jdbc:mysql://localhost/test‘, jdbc url
‘connector.table‘ = ‘my_test‘, 数据表名
‘connector.username‘ = ‘root‘, 用户名
‘connector.password‘ = ‘123456‘, 密码
‘connector.write.flush.max-rows‘ = ‘1‘ 处理数据记录数,默认5000条
);
6.执行insert语句,提交flink任务
insert into sink_table select id,name,score from source_table;
标签:inf wget username write rom follow 进入 for use
原文地址:https://blog.51cto.com/14191021/2481464