标签:技术 实例 family xtend info instance private collect 案例
一、State
在Flink中,按照基本类型,对State做了以下两类的划分:
Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用。我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>。
Operator State(或者non-keyed state),它是和Key无关的一种状态类型。相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据。因为这里没有涉及Key的概念,所以在并行度(扩/缩容)发生变化的时候,这里会有状态数据的重分布的处理。如下图:
Keyed State 应用示例:
代码示例:
public class StateManager extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * 操作 state 的句柄 * @param longLongTuple2 * @param collector * @throws Exception */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { //获取state值 Tuple2<Long, Long> currentSum = sum.value(); currentSum.f0 = currentSum.f0 + 1; currentSum.f1 = currentSum.f1 + value.f1; //操作state更新 sum.update(currentSum); //输出flatMap的算子结果 if(currentSum.f0 >= 2) { out.collect(new Tuple2<Long, Long>(value.f0, currentSum.f1/currentSum.f0)); } } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>( "average", //状态的名称 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), //状态的类型 Tuple2.of(0L, 0L) //状态的初始默认值 ); sum = getRuntimeContext().getState(descriptor); } }
Operator State 应用示例:
标签:技术 实例 family xtend info instance private collect 案例
原文地址:https://www.cnblogs.com/gxyandwmm/p/12021648.html