标签:env 最小 red one main lte unit reduce cond
package com.sjw.flink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)
val dataDS: DataStream[(String, Double)] = stream.filter(_.nonEmpty).map(data => {
val arr: Array[String] = data.split(",")
(arr(0).trim, arr(2).trim.toDouble)
})
dataDS.print("dataDS")
//时间窗口-滚动窗口 10秒内的最小温度
val minStream: DataStream[(String, Double)] = dataDS.map(data => (data._1,data._2))
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.reduce((data1,data2) => (data1._1,data1._2.min(data2._2)))
//minStream.print("minStream")
//时间窗口-滑动窗口 窗口大小15秒 滑动步长5秒
val minSlideStream: DataStream[(String, Double)] = dataDS.map(data => (data._1,data._2))
.keyBy(_._1)
.timeWindow(Time.seconds(15),Time.seconds(5))
.reduce((data1,data2) => (data1._1,data1._2.min(data2._2)))
// minSlideStream.print("minSlideStream")
//计数滚动窗口-最小温度
val countWindowStream: DataStream[(String, Double)] = dataDS.map(data => (data._1, data._2))
.keyBy(_._1)
.countWindow(5)
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
//countWindowStream.print("countWindowStream")
//计数滑动窗口-最小温度
val countSlideWindowStream: DataStream[(String, Double)] = dataDS.map(data => (data._1, data._2))
.keyBy(_._1)
.countWindow(10,2)
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
countSlideWindowStream.print("countSlideWindowStream")
env.execute()
}
}
标签:env 最小 red one main lte unit reduce cond
原文地址:https://www.cnblogs.com/whyuan/p/13276849.html