码迷,mamicode.com
首页 > Web开发 > 详细

flink解析canal-json数据

时间:2021-06-25 16:38:01      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:sonar   time   alibaba   code   ports   str   dso   sharp   gets   

引入依赖

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment
    println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行")
    val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
    println(begin_date)

    //添加kakka数据源

    val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
      .setStartFromEarliest())  //设置消费kafka位置
      .map(JSON.parseObject(_))
      .filter(_.get("table")=="epidemic_report")
      .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
      .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
//      .filter(_.getSet_id=="1")
      .filter(_.getCreat_time > begin_date)

 

flink解析canal-json数据

标签:sonar   time   alibaba   code   ports   str   dso   sharp   gets   

原文地址:https://www.cnblogs.com/gzgBlog/p/14928301.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!