码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之窗口统计温度

时间:2020-07-09 23:59:55      阅读:253      评论:0      收藏:0      [点我收藏+]

标签:env   最小   red   one   main   lte   unit   reduce   cond   

package com.sjw.flink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)

val dataDS: DataStream[(String, Double)] = stream.filter(_.nonEmpty).map(data => {
val arr: Array[String] = data.split(",")
(arr(0).trim, arr(2).trim.toDouble)
})
dataDS.print("dataDS")

//时间窗口-滚动窗口 10秒内的最小温度
val minStream: DataStream[(String, Double)] = dataDS.map(data => (data._1,data._2))
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.reduce((data1,data2) => (data1._1,data1._2.min(data2._2)))
//minStream.print("minStream")

//时间窗口-滑动窗口 窗口大小15秒 滑动步长5秒
val minSlideStream: DataStream[(String, Double)] = dataDS.map(data => (data._1,data._2))
.keyBy(_._1)
.timeWindow(Time.seconds(15),Time.seconds(5))
.reduce((data1,data2) => (data1._1,data1._2.min(data2._2)))
// minSlideStream.print("minSlideStream")

//计数滚动窗口-最小温度
val countWindowStream: DataStream[(String, Double)] = dataDS.map(data => (data._1, data._2))
.keyBy(_._1)
.countWindow(5)
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
//countWindowStream.print("countWindowStream")

//计数滑动窗口-最小温度
val countSlideWindowStream: DataStream[(String, Double)] = dataDS.map(data => (data._1, data._2))
.keyBy(_._1)
.countWindow(10,2)
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
countSlideWindowStream.print("countSlideWindowStream")


env.execute()
}
}

大数据之窗口统计温度

标签:env   最小   red   one   main   lte   unit   reduce   cond   

原文地址:https://www.cnblogs.com/whyuan/p/13276849.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!