Flink - Checkpoint使用详解3(从Checkpoint进行恢复)
本文我们结合合具体的案例来演示一下 Checkpoint 是如何对状态数据进行持久化保存。以及当任务故障后,我们又如何基于 Checkpoint 产生的数据进行恢复。
(2)下面是使用 Java 语言实现同样的功能:
(2)然后执行如下命令向集群中提交此任务。
(3)当任务正常启动之后,在 socket 中模拟产生数据:
(4)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。



(3)当任务正常启动之后,查看任务界面中的 checkpoint 信息:

(5)查看任务日志显示如下信息,说明此任务在启动的时候恢复到了之前的状态,因为上一次任务停止之前,a 出现了 2 次,b 出现了 1 次。这次任务重启后,可以累加之前状态中记录的次数。这样就实现了任务故障后的数据恢复,可以保证流计算中数据的准确性,如果没有使用状态和 checkpoint,当任务重启后,所有的数据都会归 0。


(3)这个时候回到任务界面,可以看到界面中已经不显示这个 taskmanager 了:



(8)再验证一下输出结果,发现是没有问题的,是基于之前的状态进行累加的。
1,编写样例程序
(1)首先我们开发一个有状态的单词计数案例,并且在代码中开启 checkpoint。下面是 Scala 语言实现代码:
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object WordCountStateWithCheckpointDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //开启Checkpoint env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次 //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置Checkpoint后的状态数据的存储位置 // 设置存储位置(true表示增量快照) env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints", true)) val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .keyBy(_._1) keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] { //声明一个ValueState类型的状态变量,存储单词出现的总次数 private var countState: ValueState[Int] = _ /** * 任务初始化的时候这个方法执行一次 * @param parameters */ override def open(parameters: Configuration): Unit = { //注册状态 val valueStateDesc = new ValueStateDescriptor[Int]( "countState",//指定状态名称 classOf[Int]//指定状态中存储的数据类型 ) countState = getRuntimeContext.getState(valueStateDesc) } override def map(value: (String, Int)): (String,Int) = { //从状态中获取这个key之前出现的次数 var lastNum = countState.value() val currNum = value._2 //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0 if(lastNum == null){ lastNum = 0 } //汇总出现的次数 val sum = lastNum+currNum //更新状态 countState.update(sum) //返回单词及单词出现的总次数 (value._1,sum) } }).print() env.execute("WordCountStateWithCheckpointDemo") } }
(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountStateWithCheckpointDemoJava { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启Checkpoint env.enableCheckpointing(10000); // 每10秒执行一次 // 设置任务取消后保留Checkpoint env.getCheckpointConfig() .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置状态后端为RocksDB,并配置存储路径 env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints", true)); // 获取Socket流数据 DataStream<String> text = env.socketTextStream("192.168.121.128", 9999); // 处理数据流 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }) .keyBy(tuple -> tuple.f0) .map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // 声明一个ValueState变量 private transient ValueState<Integer> countState; @Override public void open(Configuration parameters) throws Exception { // 注册状态 ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>( "countState", // 状态名称 Integer.class // 状态存储的数据类型 ); countState = getRuntimeContext().getState(descriptor); } @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception { // 获取当前Key的状态值 Integer lastNum = countState.value(); if (lastNum == null) { lastNum = 0; // 如果状态为空,初始化为0 } Integer currNum = value.f1; Integer sum = lastNum + currNum; // 更新状态 countState.update(sum); // 返回当前单词及其累计次数 return new Tuple2<>(value.f0, sum); } }).print(); // 启动程序 env.execute("WordCountStateWithCheckpointDemo"); } }
2,生成 Jar 包
我们修改项目依赖配置,然后进行打包。最后将生成的 jar 包上传到集群服务器中。具体细节可用参考我之前写的文章:
3,运行任务
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)当任务正常启动之后,在 socket 中模拟产生数据:
a b a
(4)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(5)而在任务界面中查看 checkpoint 的执行情况:
- Trriggered:表示截止到目前 checkpoint 触发的次数。
- In Progress:表示目前正在执行的 checkpoint 数量。
- Completed:表示成功执行结束的 checkpoint 次数。
- Failed:表示执行失败的 checkpoint 次数,这里显示为 2 的意思是失败了 2 次,这是因为我们现在设置的 checkpoint 间隔时间太短了,只有 10 秒,任务提交上去之后很快就会触发 checkpoint,此时 Flink 任务可能还没有初始化完成,所以会出现一些失败次数,等 Flink 任务正常运行起来之后就没问题了。实际工作中,我们会把 checkpoint 的间隔时间设置为分钟级别,一般是 2 分钟,5 分钟之类的,这样就不会出现这种问题了。
- Restored:0 表示这个任务没有基于之前的 checkpoint 数据启动,显示为 1 表示基于之前的 checkpoint 数据启动。
- Path:表示当前任务的 checkpoint 数据保存目录,注意:同一个任务每次启动生成的 checkpoint 数据目录都不一样,因为这个路径里面用到了 flink 的任务 id,任务 id 是每次都会重新生成的。

(6)此时到 hdfs 中查看一下具体生成的 checkpoint 目录:
hdfs dfs -ls hdfs://192.168.121.128:9000/flink/checkpoints/19a1faf69b80e67b5d220f5262d499bc/

4,手动恢复测试
(1)接下来我们来手工停止任务。
注意:如果是由于任务内部故障导致的任务停止,则 Flink 会基于默认的重启策略自动重启,在自动重启的时候会自动使用最新生成的那一份 checkpoint 数据进行恢复。

(2)但是刚才是我们手工停止的任务,那么任务在重启的时候想要恢复数据就需要手工指定从哪一份 checkpoint 数据启动。
注意:-s 后面指定的是某一份 checkpoint 数据,这个时候任务启动的时候会基于这份 checkpoint 数据进行恢复,这个路径必须写 hdfs 的全路径,必须要有 hdfs 路径的前缀,否则会被识别成 linux 本地路径。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/checkpoints/19a1faf69b80e67b5d220f5262d499bc/chk-286/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)当任务正常启动之后,查看任务界面中的 checkpoint 信息:
- 此时图中显示的 Restored 为 1,说明这个任务是基于之前的 checkpoint 数据恢复启动的。
- checkpoint ID 目前为 55,说明这个任务会延续之前的 Checkpoint ID,继续递增增长。
- 最下面的 Latest Restore:ID 为 54,这个编号是启动任务时使用的 Checkpoint 数据的 ID 编号。
- Restore Time 表示恢复的时间。
- Type:SavePoint,这是因为我们是手工重启的,会显示为 Savepoint,如果是任务故障时自动重启的,这里会显示为 checkpoint。具体 Savepoint 什么含义,后面我们会具体分析。
- Path:这个表示启动任务时使用的 checkpoint 数据路径。

(4)此时在 socket 中模拟产生数据:
a b
(5)查看任务日志显示如下信息,说明此任务在启动的时候恢复到了之前的状态,因为上一次任务停止之前,a 出现了 2 次,b 出现了 1 次。这次任务重启后,可以累加之前状态中记录的次数。这样就实现了任务故障后的数据恢复,可以保证流计算中数据的准确性,如果没有使用状态和 checkpoint,当任务重启后,所有的数据都会归 0。
提示:当重启后的程序正常运行后,他还会按照 Checkpoint 的配置进行运行,继续生成 Checkpoint 数据。

5,自动恢复测试
(1)我们继续一下任务在运行期间,任务内部故障导致的任务自动重启时 checkpoint 数据的自动恢复。首先,此时在任务界面中查看一下 taskmanager 节点信息,可看到 taskmanager 进程目前运行在 node1 上。

(2)我们模拟一个故障场景,由于集群节点异常导致 node1 节点宕机了,那么运行在这个节点上的 taskmanager 进程肯定也就没了。在这里我到 node1 上使用 kill 命令直接把 taskmanager 进程杀掉就可以模拟这个场景了。
- 首先在 node1 上执行 jps 命令查看目前的进程信息。

- 确定了是哪个进程之后,使用 kill 命令杀掉进程。
kill -9 39781
(3)这个时候回到任务界面,可以看到界面中已经不显示这个 taskmanager 了:

(4)此时发现任务中有一些失败的 task:

(5)稍等一会任务会自动重启成功:

(6)再看任务界面中的 checkpoint 相关的信息,可以发现此时下面的 restore 部分有信息了,表示是基于之前的第 365 份数据进行启动的,此时 type 就是 checkpoint 了。

(7)这个时候我们在 socket 中再模拟产生一条数据 a
a
(8)再验证一下输出结果,发现是没有问题的,是基于之前的状态进行累加的。
