码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之统计最低温度

时间:2020-07-10 00:16:33      阅读:80      评论:0      收藏:0      [点我收藏+]

标签:locale   assign   大数据   create   apach   ack   lte   func   timestamp   

package com.sjw.flink

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object EventWindowTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.setParallelism(1)

//设置时间语义为eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//获取数据
//val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)

val dataStream: DataStream[SensorReading] = stream.filter(_.nonEmpty).map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
.assignAscendingTimestamps(_.timestamp * 1000) //没有乱序数据的场景只需要指定时间戳
//更一般的场景 允许乱序数据
// .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
// override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
// })

//10秒滚动窗口内的最小温度 左闭右开
// val minTemp: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature))
// .keyBy(0)
// .timeWindow(Time.seconds(10))
// .reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))

//滑动窗口 统计15s最小温度,每隔5s输出一次
val minTemp: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature))
.keyBy(0)
.timeWindow(Time.seconds(15),Time.seconds(5))
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))


dataStream.print("inputData")
minTemp.print("minTemp")

env.execute()
}
}

大数据之统计最低温度

标签:locale   assign   大数据   create   apach   ack   lte   func   timestamp   

原文地址:https://www.cnblogs.com/whyuan/p/13276975.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!