标签:功能 nali stream 交互 保存 存储位置 数据存储 execution tcl
在上一篇博客当中,我们已经介绍了Flink的容错机制归根结底依赖的是Checkpoint机制,所以本篇博客是该章的核心.
为了保证state的容错性,Flink需要对state进行checkpoint。
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,
当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
a. 需要有持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等);
b. 需要有用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等),注意这里指的checkpoint的数据必须得有一个可以持久化存储的地方,这个不难理解。
1. 生成快照
首先Flink通过Checkpoint机制可以实现对Source中的数据和Task中的State进行数据存储(即生成快照):
2. 恢复快照
Flink还可以通过Restore机制来恢复之前Checkpoint快照中保存的Source数据和Task中的数据:
默认情况下,Flink的checkpoint功能是disabled的,想要使用的时候需要先启用:
env.enableCheckpointing(1000);
完整的参考代码如下:
//获取Flink的运行环境. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //开启flink的checkpoint功能:每隔1000 ms启动一个检查点(设置checkpoint的生命周期.) env.enableCheckpointing(1000); //checkpoint高级选项设置. //设置checkpoint的模式为exactly-once(这也是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //确保检查点之间至少有500ms的间隔(即checkpoint的最小间隔) env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //检查点必须在1min之内完成,否则会被丢弃(checkpoint的超时时间) env.getCheckpointConfig().setCheckpointTimeout(60000); //同一时间只允许操作一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //Flink程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方) env.setStateBackend(new FsStateBackend("hdfs://s101:9000/flink/checkpoints"));
标签:功能 nali stream 交互 保存 存储位置 数据存储 execution tcl
原文地址:https://www.cnblogs.com/zhangmingyang/p/13773133.html