标签:namenode value col 存在 吞吐量 rocksdb actor 比较 direct
流计算中可能有各种方式来保存状态:
CheckpointedFunction的函数
当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。
Flink提供了三种持久化策略,如果没有显式指定,则默认使用MemoryStateBackend。
将数据保存在java的堆里,kv状态或者window operator用hash table来保存values,triggers等等。
当进行checkpoints的时候,这种策略会对状态做快照,然后将快照作为checkpoint acknowledgement的一部分发送给JobManager,JM也将其保存在堆中。
MemoryStateBackend可以使用异步的方式进行快照,我们也鼓励使用异步的方式,避免阻塞,现在默认就是异步。如果不希望异步,可以在构造的时候传入false,如下:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
限制:
适合:
FsStateBackend 通过文件系统的URL来设置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保持数据在TM的内存中,当做checkpointing的时候,会将状态快照写入文件,保存在文件系统或本地目录。少量的元数据会保存在JM的内存中。
默认使用异步的方式进行快照,同样,取消异步需要传递false:
new FsStateBackend(path, false);
适用:
RocksDBStateBackend 通过文件系统的URL来设置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保存数据在一个叫做RocksDB的数据库中,这个数据库保存在TM的数据目录中。当做checkpointing时,整个数据库会被写入文件系统和目录。少量的元信息会保存在JM的内存中。
这种策略只支持异步快照。
限制:
适合:
能够持有的状态的多少只取决于可使用的磁盘大小,这会允许使用非常大的状态,相比较FsStateBackend将状态保存在内存中。但这也同时意味着,这个策略的吞吐量会受限。
RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。
如果你没有指定任何策略,默认使用JM作为存储策略。如果你想更改,可以在flink-conf.yaml中变更,存储策略也可以在作业中单独设定。
可以在StreamExecutionEnvironment中指定:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
默认的状态存储策略通过在flink-conf.yaml中通过state.backend来指定,有如下一些可选:
也可以以全路径来指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
来代替 RocksDBStateBackend,不过,何必了。
state.checkpoints.dir这个参数来指定所有的checkpoints数据和元数据存储的位置。示例如下:
# The backend that will be used to store operator state checkpoints state.backend: filesystem # Directory for storing checkpoints state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
标签:namenode value col 存在 吞吐量 rocksdb actor 比较 direct
原文地址:https://www.cnblogs.com/029zz010buct/p/9403283.html