码迷,mamicode.com
首页 > 数据库 > 详细

flink-sql解析canal-json实现实时同步

时间:2021-07-12 18:20:39      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:cut   main   source   error   sso   res   mod   class   格式   

package com.lezhi.business.dxxbs.transmission.table

import com.lezhi.common.{CommonTransmissonFunciton, SystemParams}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

object user_login {
  def main(args: Array[String]): Unit = {

    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bnv = StreamTableEnvironment.create(bsEnv, bsSettings)
    val table_name="user_login"
    val primaryKey="USER_ID"
    val table_column=
      """
        |USER_ID  STRING,
        |USER_PHONE  STRING,
        |USER_PWD  STRING,
        |CREAT_TIME  STRING,
        |UPLOAD_TIME  STRING,
        |UNION_ID  STRING,
        |OPEN_ID  STRING
        |""".stripMargin

    val sql_source_table="CREATE TABLE source_table_"+table_name+" (" +
      table_column+
      ") WITH (" +
      "‘connector‘ = ‘kafka‘," +         //连接类型为kafka
      "‘topic‘ = ‘"+SystemParams.TOPIC+"‘," +   //kafka topic名称
      "‘properties.bootstrap.servers‘ = ‘"+SystemParams.BOOTSTRAP_SERVER+"‘," +     //kafka bootstrap.servers配置
      "‘scan.startup.mode‘ = ‘earliest-offset‘," +   // topic消费位置设置
      "‘format‘ = ‘canal-json‘," +    //数据格式配置
      "‘canal-json.ignore-parse-errors‘ = ‘true‘," +    //当解析异常时,忽略字段的解析异常,则会将该字段值设置为null。
      "‘canal-json.table.include‘ =‘"+table_name+"‘)"


    bnv.executeSql(sql_source_table)

//    bnv.executeSql("select * from source_table_"+table_name).print()
val sql_result_table ="CREATE TABLE sink_table_"+table_name+" (" +
  table_column+
  ",PRIMARY KEY ("+primaryKey+") NOT ENFORCED" +
  ") WITH (" +
  "‘connector‘ = ‘jdbc‘," +   //连接类型为jdbc
  "‘url‘ = ‘"+SystemParams.JDBC_URL_BYMM+"‘," +     //he JDBC database url.
  "‘table-name‘ = ‘"+table_name+"‘," +    //连接的表名
  " ‘username‘ =‘"+SystemParams.JDBC_USERNAME+"‘,"+     //连接数据库用户名
  " ‘password‘ =‘"+SystemParams.JDBC_PASSWORD+"‘)"

    println(sql_result_table)
    bnv.executeSql(sql_result_table)
    bnv.executeSql("INSERT INTO sink_table_"+table_name+" SELECT * FROM source_table_"+table_name)

    bnv.execute(table_name)

  }
}

注意,下沉时下沉的表必须要有主键,否则会在更新数据时,旧数据和新数据会同时存在

flink-sql解析canal-json实现实时同步

标签:cut   main   source   error   sso   res   mod   class   格式   

原文地址:https://www.cnblogs.com/gzgBlog/p/15001333.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!