码迷,mamicode.com
首页 > 其他好文 > 详细

Flink之Table初探

时间:2021-01-14 10:44:32      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:--   pen   ack   int   rgb   tab   form   结构   auth   

知识点

Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

1、依赖:Table API 和 SQL 需要引入的依赖

 <!-- old planner flink table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>
    <!--new planner-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>

2、代码案例

package table

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
/**
 * @author yangwj
 * @date 2021/1/12 21:17
 * @version 1.0
 */
object TableExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //1、基于流创建表
    val table: Table = tableEnv.fromDataStream(dataStream)

    //2、调用table api进行转换
    val result: Table = table.select("id,temperature").filter("id == ‘sensor_1‘")
    result.toAppendStream[(String, Double)].print("result")

    //2、sql实现
    tableEnv.createTemporaryView("tabel",table)
    val sql = "select id, temperature from tabel where id = ‘sensor_1‘"

    val sqlResult: Table = tableEnv.sqlQuery(sql)
    sqlResult.toAppendStream[(String, Double)].print("sqlResult")
    env.execute("table api")
  }
}

 

Flink之Table初探

标签:--   pen   ack   int   rgb   tab   form   结构   auth   

原文地址:https://www.cnblogs.com/ywjfx/p/14269738.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!