当前位置: > > > Flink - Savepoint使用详解3(无法恢复的情况以及解决方案)

Flink - Savepoint使用详解3(无法恢复的情况以及解决方案)

    我在之前的文章中讲到在某些特殊情况下会导致任务无法从 Savepoint 中恢复(点击查看)。下面来针对两个比较常见的故障场景进行分析:
  • 故障情况1:未手工设置 uid,重启时任务中增加了新的算子
  • 故障情况2:未手工设置 uid,重启时算子并行度发生了变化

一、故障情况1:未手工设置 uid,重启时任务中增加了新的算子

1,故障现象

(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:

(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后执行如下命令向集群中提交此任务。
bin/flink run \
 -m yarn-cluster \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:

(4)当任务正常启动之后,在 socket 中模拟产生数据:
a b
a

(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \
  9c5459aeba3cf63d88be7473ba83a260 \
  hdfs://192.168.121.128:9000/flink/savepoint \
  -yid application_1733037326153_0002

(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5

(8)然后我们停止当前的任务:

 
(9)接着我们修改代码,在任务中增加一个算子,其他代码不变。
val keyedStream = text.flatMap(_.split(" "))
  .map((_, 1))
  .map(tup=>(tup._1,tup._2))
  .keyBy(_._1)

(10)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \
 -m yarn-cluster \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(11)此时发现任务提交上去之后会自动失败。这是因为不能把 savepoint 中的状态数据映射到 uid 发送了变化的算子中。

(12)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \
 -m yarn-cluster \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 --allowNonRestoredState \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2,解决方案

(1)想要解决这个问题,我们就需要手工设置算子的 uid,至少是要指定有状态的算子的 uid。首先我们将代码还原成最初状态,然后在有状态的 map 算子后面设置 uid
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object WordCountStateWithCheckpointDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //开启Checkpoint
    env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
    //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint后的状态数据的存储位置
    // 设置存储位置(true表示增量快照)
    env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
      true))

    val text = env.socketTextStream("192.168.121.128", 9999)
    import org.apache.flink.api.scala._
    val keyedStream = text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
      //声明一个ValueState类型的状态变量,存储单词出现的总次数
      private var countState: ValueState[Int] = _

      /**
       * 任务初始化的时候这个方法执行一次
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
        //注册状态
        val valueStateDesc = new ValueStateDescriptor[Int](
          "countState",//指定状态名称
          classOf[Int]//指定状态中存储的数据类型
        )
        countState = getRuntimeContext.getState(valueStateDesc)
      }

      override def map(value: (String, Int)): (String,Int) = {
        //从状态中获取这个key之前出现的次数
        var lastNum = countState.value()
        val currNum = value._2
        //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
        if(lastNum == null){
          lastNum = 0
        }
        //汇总出现的次数
        val sum = lastNum+currNum
        //更新状态
        countState.update(sum)
        //返回单词及单词出现的总次数
        (value._1,sum)
      }

    }).uid("vs_map001")
      .print()

    env.execute("WordCountStateWithCheckpointDemo")
  }
}

(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03

(3)接着我们再修改代码,增加一个 map 算子:
val keyedStream = text.flatMap(_.split(" "))
  .map((_, 1))
  .map(tup=>(tup._1,tup._2))
  .keyBy(_._1)

(4)重新编译打包,并将新的 jar 包上传到集群。然后执行如下命令基于之前生成的 savepoint 数据进行恢复。
bin/flink run \
 -m yarn-cluster \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(5)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(6)在 socket 中模拟产生数据:
a

(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。

二、故障情况2:未手工设置 uid,重启时算子并行度发生了变化

1,故障现象

(1)我们还是以前文的有状态的单词计数案例作为演示,具体代码见上文:

(2)接着我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后执行如下命令向集群中提交此任务。
bin/flink run \
 -m yarn-cluster \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 启动后会看到任务 ID 和对应的 yarn applicationid,这个后面手工触发 Savepoint 需要用到:

(4)当任务正常启动之后,在 socket 中模拟产生数据:
a b
a

(5)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(6)接着我们执行如下命令手工触发 savepoint。注意:flink 任务 id 和对应的 yarn applicationid 根据实际情况进行修改。
bin/flink savepoint \
  9c5459aeba3cf63d88be7473ba83a260 \
  hdfs://192.168.121.128:9000/flink/savepoint \
  -yid application_1733037326153_0002

(7)此时到任务界面查看,可以看到这里显示的 savepoint 信息。例如我这里的保存路径就是:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5

(8)然后我们停止当前的任务:

 
(9)接着执行如下命令基于之前生成的 savepoint 数据进行恢复,特别的是这里我们还通过 -p 指定全局并行度为 2
bin/flink run \
 -m yarn-cluster \
 -p 2 \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(10)此时发现任务提交上去之后会自动失败。这是因为改变并行度时,也会影响算子默认生成的 uid

(11)如果我们想忽略这个问题,可以指定 --allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。
bin/flink run \
 -m yarn-cluster \
 -p 2 \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-9c5459-c5c31024cfe5/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 --allowNonRestoredState \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2,解决方案

(1)所以想要支持在恢复状态的时候修改并行度,需要给有状态的算子手工设置 uid
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object WordCountStateWithCheckpointDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //开启Checkpoint
    env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
    //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint后的状态数据的存储位置
    // 设置存储位置(true表示增量快照)
    env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
      true))

    val text = env.socketTextStream("192.168.121.128", 9999)
    import org.apache.flink.api.scala._
    val keyedStream = text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
      //声明一个ValueState类型的状态变量,存储单词出现的总次数
      private var countState: ValueState[Int] = _

      /**
       * 任务初始化的时候这个方法执行一次
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
        //注册状态
        val valueStateDesc = new ValueStateDescriptor[Int](
          "countState",//指定状态名称
          classOf[Int]//指定状态中存储的数据类型
        )
        countState = getRuntimeContext.getState(valueStateDesc)
      }

      override def map(value: (String, Int)): (String,Int) = {
        //从状态中获取这个key之前出现的次数
        var lastNum = countState.value()
        val currNum = value._2
        //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
        if(lastNum == null){
          lastNum = 0
        }
        //汇总出现的次数
        val sum = lastNum+currNum
        //更新状态
        countState.update(sum)
        //返回单词及单词出现的总次数
        (value._1,sum)
      }

    }).uid("vs_map001")
      .print()

    env.execute("WordCountStateWithCheckpointDemo")
  }
}

(2)然后我们成 jar 包还是按照上面步骤提交任务,产生一些测试数据,接着手工触发 savepoint,然后取消任务。假设这次保存路径如下:
hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03

(3)接着执行如下命令基于之前生成的 savepoint 数据进行恢复。同样的这里我们还通过 -p 指定全局并行度为 2
bin/flink run \
 -m yarn-cluster \
 -p 2 \
 -s hdfs://192.168.121.128:9000/flink/savepoint/savepoint-0546f8-38817a2d8a03/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(4)此时发现任务可以正常启动。查看任务界面中的信息,可以看到任务是基于之前的 savepoint 数据进行恢复的。

(5)并且查看任务界面,可以看到并行度变成 2 了。

(6)在 socket 中模拟产生数据:
a

(7)然后到任务界面查看输出结果信息。可发现 a 出现次数变成 3,这样就说明任务正常基于 savepoint 的数据恢复到了之前的状态。
评论0