标签:incr 存在 第一个 string get end date ide port
(1) ProcessFunction
在没有开窗和keyby的情况下使用
(2) KeyedProcessFunction
在keyby之后使用
(3) CoProcessFunction
(4) ProcessJoinFunction
(5) BroadcastProcessFunction
(6) KeyedBroadcastProcessFunction
(7) ProcessWindowFunction
在开窗后使用
(8) ProcessAllWindowFunction
KeyedProcessFunction 用来操作 KeyedStream。 KeyedProcessFunction 会处理流的每一个元素,
输出为 0 个、 1 个或者多个元素。所有的 Process Function 都继承自 RichFunction 接口,所以
都有 open()、 close() 和 getRuntimeContext() 等方法。而 KeyedProcessFunction[KEY, IN, OUT]
还额外提供了两个方法:
? processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这
个方法,调用结果将会放在 Collector 数据类型中输出。 Context 可以访问元素的时间
戳,元素的 key,以及 TimerService 时间服务。 Context 还可以将结果输出到别的流 (side
outputs)。
? onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数。
当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。目录 94
Collector 为输出结果的集合。 OnTimerContext 和 processElement 的 Context 参数一样,
提供了上下文的一些信息,例如 firing trigger 的时间信息 (事件时间或者处理时间)。
Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
? currentProcessingTime(): Long 返回当前处理时间
? currentWatermark(): Long 返回当前水位线的时间戳
? registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的 timer。当 processing time 到达定时时间时,触发 timer。
? registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time
timer。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
? deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时
器。如果没有这个时间戳的定时器,则不执行。
? deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,
如果没有此时间戳的定时器,则不执行。
当定时器 timer 触发时,执行回调函数 onTimer()。 processElement() 方法和 onTimer() 方法是
同步(不是异步)方法,这样可以避免并发访问和操作状态。
package test4
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// nc -lk 9999
//a 1
object ProcessingTimeOnTimer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val stream = env
.socketTextStream("localhost", 9999, ‘\n‘)
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1))
})
.keyBy(_._1)
.process(new MyKeyedProcess)
stream.print()
env.execute()
}
class MyKeyedProcess extends KeyedProcessFunction[String, (String, String), String] {
// 来一条数据调用一次
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit = {
// 当前机器时间
val curTime = ctx.timerService().currentProcessingTime()
// 当前机器时间10s之后,触发定时器
ctx.timerService().registerProcessingTimeTimer(curTime + 10 * 1000L)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("位于时间戳:" + new Timestamp(timestamp) + "的定时器触发了!")
}
}
}
package test4
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// nc -lk 9999
//a 1
object ProcessingTimeOnTimer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val stream = env
.socketTextStream("localhost", 9999, ‘\n‘)
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1))
})
.keyBy(_._1)
.process(new MyKeyedProcess)
stream.print()
env.execute()
}
class MyKeyedProcess extends KeyedProcessFunction[String, (String, String), String] {
// 来一条数据调用一次
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit = {
// 当前机器时间
val curTime = ctx.timerService().currentProcessingTime()
// 当前机器时间10s之后,触发定时器
ctx.timerService().registerProcessingTimeTimer(curTime + 10 * 1000L)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("位于时间戳:" + new Timestamp(timestamp) + "的定时器触发了!")
}
}
}
package test4
import test2.{SensorReading, SensorSource}
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// 如果某一个传感器连续1s中温度上升,报警!
object TempIncreaseAlert {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
.keyBy(_.id)
.process(new TempIncreaseAlertFunction)
stream.print()
env.execute()
}
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
// 用来存储最近一次的温度
// 当保存检查点的时候,会将状态变量保存到状态后端
// 默认状态后端是内存,也可以配置hdfs等为状态后端
// 懒加载,当运行到process方法的时候,才会惰性赋值
// 状态变量只会被初始化一次
// 根据`last-temp`这个名字到状态后端去查找,如果状态后端中没有,那么初始化
// 如果在状态后端中存在`last-temp`的状态变量,直接懒加载
// 默认值是`0.0`
lazy val lastTemp = getRuntimeContext.getState(
new ValueStateDescriptor[Double](
"last-temp",
Types.of[Double]
)
)
// 存储定时器时间戳的状态变量
// 默认值是`0L`
lazy val currentTimer = getRuntimeContext.getState(
new ValueStateDescriptor[Long](
"timer",
Types.of[Long]
)
)
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 获取最近一次的温度, 使用`.value()`
val prevTemp = lastTemp.value()
// 将当前温度存入状态变量, `.update()`
lastTemp.update(value.temperature)
// 获取定时器状态变量中的时间戳
val curTimerTimestamp = currentTimer.value()
// 温度:1,2,3,4,5,2
if (prevTemp == 0.0 || value.temperature < prevTemp) {
// 如果当前温度是第一个温度读数,或者温度下降
// 删除状态变量保存的时间戳对应的定时器
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
currentTimer.clear() // 清空状态变量
} else if (value.temperature > prevTemp && curTimerTimestamp == 0L) {
// 如果温度上升,且保存定时器时间戳的状态变量为空,就注册一个定时器
// 注册一个1s之后的定时器
val timerTs = ctx.timerService().currentProcessingTime() + 1000L
ctx.timerService().registerProcessingTimeTimer(timerTs)
// 将时间戳存入状态变量
currentTimer.update(timerTs)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("传感器ID为 " + ctx.getCurrentKey + " 的传感器,温度连续1秒钟上升了!")
currentTimer.clear() // 清空状态变量
}
}
}
package test4
import test2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SideOutputExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
.process(new FreezingMonitor)
stream
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()
stream.print() // 打印主流
env.execute()
}
// 为什么用`ProcessFunction`? 因为没有keyBy分流
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
// 定义侧输出标签
lazy val freezingAlarmOutput = new OutputTag[String]("freezing-alarms")
// 来一条数据,调用一次
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature < 32.0) {
// 将报警信息发送到侧输出流
ctx.output(freezingAlarmOutput, s"传感器ID为 ${value.id} 的传感器发出低于32华氏度的低温报警!")
}
out.collect(value) // 在主流上,将数据继续向下发送
}
}
}
9 Process Function (Low-Level API)
标签:incr 存在 第一个 string get end date ide port
原文地址:https://www.cnblogs.com/andyonline/p/13369585.html