码迷,mamicode.com
首页 > 其他好文 > 详细

Flink状态管理与恢复(3)

时间:2020-10-06 20:55:32      阅读:31      评论:0      收藏:0      [点我收藏+]

标签:功能   nali   stream   交互   保存   存储位置   数据存储   execution   tcl   

     在上一篇博客当中,我们已经介绍了Flink的容错机制归根结底依赖的是Checkpoint机制,所以本篇博客是该章的核心.

什么是Checkpoint

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,

当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

Checkpoint的前提

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:

a. 需要有持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等);

b. 需要有用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等),注意这里指的checkpoint的数据必须得有一个可以持久化存储的地方,这个不难理解。

Checkpoint的容错机制图

1. 生成快照

首先Flink通过Checkpoint机制可以实现对Source中的数据和Task中的State进行数据存储(即生成快照):

技术图片

 

2. 恢复快照

Flink还可以通过Restore机制来恢复之前Checkpoint快照中保存的Source数据和Task中的数据:

技术图片

 

 

如何开启Flink的Checkpoint功能

默认情况下,Flink的checkpoint功能是disabled的,想要使用的时候需要先启用:

env.enableCheckpointing(1000);

完整的参考代码如下:

        //获取Flink的运行环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启flink的checkpoint功能:每隔1000 ms启动一个检查点(设置checkpoint的生命周期.)
        env.enableCheckpointing(1000);

        //checkpoint高级选项设置.
        //设置checkpoint的模式为exactly-once(这也是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //确保检查点之间至少有500ms的间隔(即checkpoint的最小间隔)
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //检查点必须在1min之内完成,否则会被丢弃(checkpoint的超时时间)
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //同一时间只允许操作一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //Flink程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint.
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方)
        env.setStateBackend(new FsStateBackend("hdfs://s101:9000/flink/checkpoints"));

  

 

Flink状态管理与恢复(3)

标签:功能   nali   stream   交互   保存   存储位置   数据存储   execution   tcl   

原文地址:https://www.cnblogs.com/zhangmingyang/p/13773133.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!