Flink - Savepoint使用详解3(无法恢复的情况以及解决方案)
我在之前的文章中讲到在某些特殊情况下会导致任务无法从 Savepoint 中恢复(点击查看)。下面来针对两个比较常见的故障场景进行分析:
(3)然后执行如下命令向集群中提交此任务。
(4)当任务正常启动之后,在 socket 中模拟产生数据:
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
(8)然后我们停止当前的任务:

(10)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
(11)此时发现任务提交上去之后会自动失败。这是因为不能把 savepoint 中的状态数据映射到 uid 发送了变化的算子中。
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
(3)接着我们再修改代码,增加一个 map 算子:
(4)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
(5)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
(3)然后执行如下命令向集群中提交此任务。
(4)当任务正常启动之后,在 socket 中模拟产生数据:
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
(8)然后我们停止当前的任务:

(10)此时发现任务提交上去之后会自动失败。这是因为改变并行度时,也会影响算子默认生成的 uid。

(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
(3)接着执行如下命令基于之前生成的 savepoint 数据进行恢复。同样的这里我们还通过 -p 指定全局并行度为 2。
(4)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
- 故障情况1:未手工设置 uid,重启时任务中增加了新的算子
- 故障情况2:未手工设置 uid,重启时算子并行度发生了变化
一、故障情况1:未手工设置 uid,重启时任务中增加了新的算子
1,故障现象
(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:
(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:
a b a
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \ 9c5459aeba3cf63d88be7473ba83a260 \ hdfs://192.168.121.128:9000/flink/savepoint \ -yid application_1733037326153_0002
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5


(9)接着我们修改代码,在任务中增加一个算子,其他代码不变。
val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .map(tup=>(tup._1,tup._2)) .keyBy(_._1)
(10)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(11)此时发现任务提交上去之后会自动失败。这是因为不能把 savepoint 中的状态数据映射到 uid 发送了变化的算子中。

(12)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ --allowNonRestoredState \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2,解决方案
(1)想要解决这个问题,我们就需要手工设置算子的 uid,至少是要指定有状态的算子的 uid。首先我们将代码还原成最初状态,然后在有状态的 map 算子后面设置 uid。
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object WordCountStateWithCheckpointDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //开启Checkpoint env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次 //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置Checkpoint后的状态数据的存储位置 // 设置存储位置(true表示增量快照) env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints", true)) val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .keyBy(_._1) keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] { //声明一个ValueState类型的状态变量,存储单词出现的总次数 private var countState: ValueState[Int] = _ /** * 任务初始化的时候这个方法执行一次 * @param parameters */ override def open(parameters: Configuration): Unit = { //注册状态 val valueStateDesc = new ValueStateDescriptor[Int]( "countState",//指定状态名称 classOf[Int]//指定状态中存储的数据类型 ) countState = getRuntimeContext.getState(valueStateDesc) } override def map(value: (String, Int)): (String,Int) = { //从状态中获取这个key之前出现的次数 var lastNum = countState.value() val currNum = value._2 //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0 if(lastNum == null){ lastNum = 0 } //汇总出现的次数 val sum = lastNum+currNum //更新状态 countState.update(sum) //返回单词及单词出现的总次数 (value._1,sum) } }).uid("vs_map001") .print() env.execute("WordCountStateWithCheckpointDemo") } }
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03
(3)接着我们再修改代码,增加一个 map 算子:
val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .map(tup=>(tup._1,tup._2)) .keyBy(_._1)
(4)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \ -m yarn-cluster \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(5)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(6)在 socket 中模拟产生数据:
a
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。

二、故障情况2:未手工设置 uid,重启时算子并行度发生了变化
1,故障现象
(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:
(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后执行如下命令向集群中提交此任务。
bin/flink run \ -m yarn-cluster \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:
a b a
(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \ 9c5459aeba3cf63d88be7473ba83a260 \ hdfs://192.168.121.128:9000/flink/savepoint \ -yid application_1733037326153_0002
(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5


(9)接着执行如下命令基于之前生成的 savepoint 数据进行恢复,特别的是这里我们还通过 -p 指定全局并行度为 2。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(11)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ --allowNonRestoredState \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2,解决方案
(1)所以想要支持在恢复状态的时候修改并行度,需要给有状态的算子手工设置 uid。
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object WordCountStateWithCheckpointDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //开启Checkpoint env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次 //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置Checkpoint后的状态数据的存储位置 // 设置存储位置(true表示增量快照) env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints", true)) val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .keyBy(_._1) keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] { //声明一个ValueState类型的状态变量,存储单词出现的总次数 private var countState: ValueState[Int] = _ /** * 任务初始化的时候这个方法执行一次 * @param parameters */ override def open(parameters: Configuration): Unit = { //注册状态 val valueStateDesc = new ValueStateDescriptor[Int]( "countState",//指定状态名称 classOf[Int]//指定状态中存储的数据类型 ) countState = getRuntimeContext.getState(valueStateDesc) } override def map(value: (String, Int)): (String,Int) = { //从状态中获取这个key之前出现的次数 var lastNum = countState.value() val currNum = value._2 //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0 if(lastNum == null){ lastNum = 0 } //汇总出现的次数 val sum = lastNum+currNum //更新状态 countState.update(sum) //返回单词及单词出现的总次数 (value._1,sum) } }).uid("vs_map001") .print() env.execute("WordCountStateWithCheckpointDemo") } }
(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03
(3)接着执行如下命令基于之前生成的 savepoint 数据进行恢复。同样的这里我们还通过 -p 指定全局并行度为 2。
bin/flink run \ -m yarn-cluster \ -p 2 \ -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \ -c WordCountStateWithCheckpointDemo \ -yjm 1024 \ -ytm 1024 \ flink-1.0-SNAPSHOT-jar-with-dependencies.jar
(4)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(5)并且查看任务界面,可以看到并行度变成 2 了。

(6)在 socket 中模拟产生数据:
a
(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
