标签:sonar time alibaba code ports str dso sharp gets
引入依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.33</version> </dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行") val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args)) println(begin_date) //添加kakka数据源 val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps()) .setStartFromEarliest()) //设置消费kafka位置 .map(JSON.parseObject(_)) .filter(_.get("table")=="epidemic_report") .filter(_.get("type").toString.matches("(INSERT|UPDATE)")) .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass)) // .filter(_.getSet_id=="1") .filter(_.getCreat_time > begin_date)
标签:sonar time alibaba code ports str dso sharp gets
原文地址:https://www.cnblogs.com/gzgBlog/p/14928301.html