标签:locale assign 大数据 create apach ack lte func timestamp
package com.sjw.flink
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object EventWindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.setParallelism(1)
//设置时间语义为eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取数据
//val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)
val dataStream: DataStream[SensorReading] = stream.filter(_.nonEmpty).map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
.assignAscendingTimestamps(_.timestamp * 1000) //没有乱序数据的场景只需要指定时间戳
//更一般的场景 允许乱序数据
// .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
// override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
// })
//10秒滚动窗口内的最小温度 左闭右开
// val minTemp: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature))
// .keyBy(0)
// .timeWindow(Time.seconds(10))
// .reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
//滑动窗口 统计15s最小温度,每隔5s输出一次
val minTemp: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature))
.keyBy(0)
.timeWindow(Time.seconds(15),Time.seconds(5))
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
dataStream.print("inputData")
minTemp.print("minTemp")
env.execute()
}
}
标签:locale assign 大数据 create apach ack lte func timestamp
原文地址:https://www.cnblogs.com/whyuan/p/13276975.html