Flink - Checkpoint使用详解2(开启Checkpoint、相关参数配置)
二、开启 Checkpoint
1,基本用法
(1)使用 enableCheckpointing 方法即可开启 Checkpoint:
// 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期) env.enableCheckpointing(5000)
(2)针对 checkpoint 还有一些相关的配置:
// 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期) env.enableCheckpointing(5000) //设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) //Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间) env.getCheckpointConfig.setCheckpointTimeout(60000) //同一时间只允许执行一个Checkpoint env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
2,设置 State 数据存储的位置
(1)默认情况下,State 数据会保存在 TaskManager 的内存中。Checkpoint 执行时,会将 State 数据存储在 JobManager 的内存中。
(2)具体的存储位置取决于 State Backend 的配置,Flink 一共提供了 3 种存储方式:
- MemoryStateBackend:State 数据保存在 Java 堆内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到 JobManager 的内存中,基于内存的 State Backend 在生产环境下不建议使用。
- FsStateBackend:State 数据保存在 TaskManager 的内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到配置的文件系统中,可以使用 HDFS 等分布式文件系统。
- RocksDBStateBackend:RocksDB 跟上面的都略有不同,它会在本地文件系统中维护 State,State 会直接写入本地 RocksDB 中。同时它需要配置一个远端的文件系统(一般是 HDFS),在做 Checkpoint 的时候,会把本地的数据直接复制到远端的文件系统中。故障切换的时候直接从远端的文件系统中恢复数据到本地。RocksDB 克服了 State 受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产环境中使用。
3,使用 RocksDBStateBackend 进行快照存储
(1)首先在项目中需要引入 RocksDBStateBackend 相关依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.0</version> </dependency>
(2)然后在代码种设置状态数据存储的位置:
// 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期) env.enableCheckpointing(5000) // 设置存储位置(true表示增量快照) env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",true))
(3)运行程序,可以看到 HDFS 上指定位置生成了相关的目录和文件:

(4)由于我们设置了每隔 5 秒进行一次 checkpoint,可以发现快照的 ID 也是每隔五秒变化一次。
