标签:drive 表达式 art 适合 text ast ora elastic view
val tableEnv = ... // 创建表的执行环境 // 创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 注册一张表,用于把计算结果输出 tableEnv.connect(...).createTemporaryTable("outputTable") // 通过 Table API 查询算子,得到一张结果表 val result = tableEnv.from("inputTable").select(...) // 通过 SQL 查询语句,得到一张结果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 将结果表写入输出表中 result.insertInto("outputTable")
val tableEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本 planner .inStreamingMode() // 流处理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings)
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
tableEnv .connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接 .withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("inputTable") // 创建临时表
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.0</version> </dependency>
tableEnv.connect( new Kafka() .version("0.11") // 定义 kafka 的版本 .topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable")
val sensorTable: Table = tableEnv.from("inputTable") val resultTable: Table = senorTable .select("id, temperature") .filter("id =‘sensor_1‘")
val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id =‘sensor_1‘")
val resultSqlTable: Table = tableEnv.sqlQuery( """ |select id, temperature |from inputTable |where id = ‘sensor_1‘ """.stripMargin)
val aggResultTable = sensorTable
.groupBy(‘id)
.select(‘id, ‘id.count as ‘count)
val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id")
val inputStream: DataStream[String] = env.readTextFile("sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) val sensorTable: Table = tableEnv.fromDataStream(dataStream) val sensorTable2 = tableEnv.fromDataStream(dataStream, ‘id, ‘timestamp as ‘ts)
val sensorTable = tableEnv.fromDataStream(dataStream, ‘timestamp as ‘ts, ‘id as ‘myId, ‘temperature)
val sensorTable = tableEnv.fromDataStream(dataStream, ‘myId, ‘ts)
tableEnv.createTemporaryView("sensorView", dataStream) tableEnv.createTemporaryView("sensorView", dataStream, ‘id, ‘temperature, ‘timestamp as ‘ts)
tableEnv.createTemporaryView("sensorView", sensorTable)
// 注册输出表 tableEnv.connect( new FileSystem().path("…\\resources\\out.txt") ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义格式化方法,Csv 格式 .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("outputTable") // 创建临时表 resultSqlTable.insertInto("outputTable")
// 输出到 kafka tableEnv.connect( new Kafka() .version("0.11") .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat( new Csv() ) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable")
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency>
// 输出到 es tableEnv.connect( new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("sensor") .documentType("temp") ) .inUpsertMode() // 指定是 Upsert 模式 .withFormat(new Json()) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggResultTable.insertInto("esOutputTable")
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency>
// 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | ‘connector.type‘ = ‘jdbc‘, | ‘connector.url‘ = ‘jdbc:mysql://localhost:3306/test‘, | ‘connector.table‘ = ‘sensor_count‘, | ‘connector.driver‘ = ‘com.mysql.jdbc.Driver‘, | ‘connector.username‘ = ‘root‘, | ‘connector.password‘ = ‘123456‘ |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) aggResultSqlTable.insertInto("jdbcOutputTable")
val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable) val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable) resultStream.print("result") aggResultStream.print("aggResult")
val explaination: String = tableEnv.explain(resultTable)
println(explaination)
Flink基础(十三):Table API 和 Flink SQL(二)API 调用
标签:drive 表达式 art 适合 text ast ora elastic view
原文地址:https://www.cnblogs.com/qiu-hua/p/13432502.html