标签:support 结束 stream 并且 path 丢失 nta 消息 size
在Flink中,函数和操作符都可以是有状态的。在处理每个消息或者元素时,有状态的函数都会储存信息,使得状态成为精密操作中关键的组成部分。
为了使状态能够容错,Flink会checkpoints状态。checkpoints机制使得Flink可以恢复状态和位置,以至于流计算的应用可以提供无故障执行的语义。
Flink的checkpointing机制对流和状态的可靠存储有如下两点要求:
Flink默认不启用Checkpointing。如果要启用,可以在StreamExecutionEnvironment上调用enableCheckpointing(n),其中n是以毫秒为单位的checkpoint间隔。
还有其他一些参数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
相关的配置选项
Key | Default | Description |
---|---|---|
state.backend |
(none) | The state backend to be used to store and checkpoint state. |
state.backend.async |
true | Option whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option. |
state.backend.fs.memory-threshold |
1024 | The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. |
state.backend.incremental |
false | Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option. |
state.backend.local-recovery |
false | |
state.backend.rocksdb.localdir |
(none) | The local directory (on the TaskManager) where RocksDB puts its files. |
state.checkpoints.dir |
(none) | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.checkpoints.num-retained |
1 | The maximum number of completed checkpoints to retain. |
state.savepoints.dir |
(none) | The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). |
taskmanager.state.local.root-dirs |
(none) |
Flink的checkpointing机制会存储状态的一致性快照,配置了不同的状态存储策略,checkpoints就会保存在不同的地方,比如JM的内存,文件系统还是数据库。
默认,状态会保存在TM的内存中,checkpoints会保存在JM的内存中。但是为了保存特别大的状态,Flink也支持将状态保存和checkpointing到其他的地方。
Flink目前只对没有迭代的作业提供处理保证。在迭代作业中进行checkpointing会导致异常发生,如果用户强制要在迭代应用中启用checkpoint,则需要设置env.enableCheckpointing(interval, force = true),但这也不能保证处在循环边界上的数据和状态不会丢失。
Flink支持很多重启策略,比如Fixed Delay Restart Strategy,Failure Rate Restart Strategy,No Restart Strategy,Fallback Restart Strategy,容后再叙。
标签:support 结束 stream 并且 path 丢失 nta 消息 size
原文地址:https://www.cnblogs.com/029zz010buct/p/9404982.html