上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态
值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状
态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。
2 有状态的算子和应用程序
Flink 内置的很多算子,数据源 source,数据存储 sink 都是有状态的,流中的数据都是 buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction
会缓存输入流的数据,ProcessFunction 会保存设置的定时器信息等等。
在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
? 算子状态(operator state) 算子状态的作用范围限定为算子任务
? 键控状态(keyed state) 根据输入数据流中定义的键(key)来维护和访问
2.1 算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由
相同或不同算子的另一个任务访问。
图 具有算子状态的任务
Flink 为算子状态提供三种基本数据结构:
? 列表状态(List state)
将状态表示为一组数据的列表。
? 联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
? 广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
2.2 键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护
和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于
一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。
图 具有键控状态的任务
Flink 的 Keyed State 支持以下数据类型:
? ValueState[T]保存单个的值,值的类型为 T。
o get 操作: ValueState.value()
o set 操作: ValueState.update(value: T)
? ListState[T]保存一个列表,列表里的元素的数据类型为 T。基本操作如下:
o ListState.add(value: T)
o ListState.addAll(values: java.util.List[T])
o ListState.get()返回 Iterable[T]
o ListState.update(values: java.util.List[T])
? MapState[K, V]保存 Key-Value 对。
o MapState.get(key: K)
o MapState.put(key: K, value: V)
o MapState.contains(key: K)
o MapState.remove(key: K)
? ReducingState[T]
? AggregatingState[I, O]
State.clear()是清空操作。
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
val alerts: DataStream[(String, Double, Double)] = keyedData
.flatMap(new TemperatureAlertFunction(1.7))
class TemperatureAlertFunction(val threshold: Double) extends
RichFlatMapFunction[SensorReading, (String, Double, Double)] {
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp",
classOf[Double])
lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) }
override def flatMap(reading: SensorReading,
out: Collector[(String, Double, Double)]): Unit = {
val lastTemp = lastTempState.value()
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
out.collect((reading.id, reading.temperature, tempDiff))
}
this.lastTempState.update(reading.temperature) } }
View Code
通过 RuntimeContext 注册 StateDescriptor。StateDescriptor 以状态 state 的名字和存储的数据类型为参数。
在 open()方法中创建 state 变量。注意复习之前的 RichFunction 相关知识。
接下来我们使用了 FlatMap with keyed ValueState 的快捷方式 flatMapWithState实现以上需求。
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMapWithState[(String, Double, Double), Double] {
case (in: SensorReading, None) =>
(List.empty, Some(in.temperature))
case (r: SensorReading, lastTemp: Some[Double]) =>
val tempDiff = (r.temperature - lastTemp.get).abs
if (tempDiff > 1.7) {
(List((r.id, r.temperature, tempDiff)), Some(r.temperature))
} else {
(List.empty, Some(r.temperature))
}
}
View Code