当前位置: > > > Flink - Checkpoint使用详解3(从Checkpoint进行恢复)

Flink - Checkpoint使用详解3(从Checkpoint进行恢复)

    本文我们结合合具体的案例来演示一下 Checkpoint 是如何对状态数据进行持久化保存。以及当任务故障后,我们又如何基于 Checkpoint 产生的数据进行恢复。

1,编写样例程序

(1)首先我们开发一个有状态的单词计数案例,并且在代码中开启 checkpoint。下面是 Scala 语言实现代码:
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object WordCountStateWithCheckpointDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //开启Checkpoint
    env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
    //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint后的状态数据的存储位置
    // 设置存储位置(true表示增量快照)
    env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
      true))

    val text = env.socketTextStream("192.168.121.128", 9999)
    import org.apache.flink.api.scala._
    val keyedStream = text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
      //声明一个ValueState类型的状态变量,存储单词出现的总次数
      private var countState: ValueState[Int] = _

      /**
       * 任务初始化的时候这个方法执行一次
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
        //注册状态
        val valueStateDesc = new ValueStateDescriptor[Int](
          "countState",//指定状态名称
          classOf[Int]//指定状态中存储的数据类型
        )
        countState = getRuntimeContext.getState(valueStateDesc)
      }

      override def map(value: (String, Int)): (String,Int) = {
        //从状态中获取这个key之前出现的次数
        var lastNum = countState.value()
        val currNum = value._2
        //如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
        if(lastNum == null){
          lastNum = 0
        }
        //汇总出现的次数
        val sum = lastNum+currNum
        //更新状态
        countState.update(sum)
        //返回单词及单词出现的总次数
        (value._1,sum)
      }

    }).print()

    env.execute("WordCountStateWithCheckpointDemo")
  }
}

(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountStateWithCheckpointDemoJava {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启Checkpoint
        env.enableCheckpointing(10000); // 每10秒执行一次
        // 设置任务取消后保留Checkpoint
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置状态后端为RocksDB,并配置存储路径
        env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.121.128:9000/flink/checkpoints",
                true));

        // 获取Socket流数据
        DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);

        // 处理数据流
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                })
                .keyBy(tuple -> tuple.f0)
                .map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                    // 声明一个ValueState变量
                    private transient ValueState<Integer> countState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 注册状态
                        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
                                "countState", // 状态名称
                                Integer.class // 状态存储的数据类型
                        );
                        countState = getRuntimeContext().getState(descriptor);
                    }

                    @Override
                    public Tuple2<String, Integer> map(Tuple2<String, Integer> value)
                            throws Exception {
                        // 获取当前Key的状态值
                        Integer lastNum = countState.value();
                        if (lastNum == null) {
                            lastNum = 0; // 如果状态为空,初始化为0
                        }
                        Integer currNum = value.f1;
                        Integer sum = lastNum + currNum;

                        // 更新状态
                        countState.update(sum);

                        // 返回当前单词及其累计次数
                        return new Tuple2<>(value.f0, sum);
                    }
                }).print();

        // 启动程序
        env.execute("WordCountStateWithCheckpointDemo");
    }
}

2,生成 Jar 包

我们修改项目依赖配置,然后进行打包。最后将生成的 jar 包上传到集群服务器中。具体细节可用参考我之前写的文章:

3,运行任务

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后执行如下命令向集群中提交此任务。
bin/flink run \
 -m yarn-cluster \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)当任务正常启动之后,在 socket 中模拟产生数据:
a b
a

(4)到任务界面查看输出结果信息,可用看到此时 a 出现 2 次,b 出现了 1 次。

(5)而在任务界面中查看 checkpoint 的执行情况:
  • Trriggered:表示截止到目前 checkpoint 触发的次数。
  • In Progress:表示目前正在执行的 checkpoint 数量。
  • Completed:表示成功执行结束的 checkpoint 次数。
  • Failed:表示执行失败的 checkpoint 次数,这里显示为 2 的意思是失败了 2 次,这是因为我们现在设置的 checkpoint 间隔时间太短了,只有 10 秒,任务提交上去之后很快就会触发 checkpoint,此时 Flink 任务可能还没有初始化完成,所以会出现一些失败次数,等 Flink 任务正常运行起来之后就没问题了。实际工作中,我们会把 checkpoint 的间隔时间设置为分钟级别,一般是 2 分钟,5 分钟之类的,这样就不会出现这种问题了。
  • Restored0 表示这个任务没有基于之前的 checkpoint 数据启动,显示为 1 表示基于之前的 checkpoint 数据启动。
  • Path:表示当前任务的 checkpoint 数据保存目录,注意:同一个任务每次启动生成的 checkpoint 数据目录都不一样,因为这个路径里面用到了 flink 的任务 id,任务 id 是每次都会重新生成的。

(6)此时到 hdfs 中查看一下具体生成的 checkpoint 目录:
hdfs dfs -ls hdfs://192.168.121.128:9000/flink/checkpoints/19a1faf69b80e67b5d220f5262d499bc/

4,手动恢复测试

(1)接下来我们来手工停止任务。
注意:如果是由于任务内部故障导致的任务停止,则 Flink 会基于默认的重启策略自动重启,在自动重启的时候会自动使用最新生成的那一份 checkpoint 数据进行恢复。

(2)但是刚才是我们手工停止的任务,那么任务在重启的时候想要恢复数据就需要手工指定从哪一份 checkpoint 数据启动。
注意-s 后面指定的是某一份 checkpoint 数据,这个时候任务启动的时候会基于这份 checkpoint 数据进行恢复,这个路径必须写 hdfs 的全路径,必须要有 hdfs 路径的前缀,否则会被识别成 linux 本地路径。
bin/flink run \
 -m yarn-cluster \
 -s hdfs://192.168.121.128:9000/flink/checkpoints/19a1faf69b80e67b5d220f5262d499bc/chk-286/_metadata \
 -c WordCountStateWithCheckpointDemo \
 -yjm 1024 \
 -ytm 1024 \
 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(3)当任务正常启动之后,查看任务界面中的 checkpoint 信息:
  • 此时图中显示的 Restored1,说明这个任务是基于之前的 checkpoint 数据恢复启动的。
  • checkpoint ID 目前为 55,说明这个任务会延续之前的 Checkpoint ID,继续递增增长。
  • 最下面的 Latest RestoreID54,这个编号是启动任务时使用的 Checkpoint 数据的 ID 编号。
  • Restore Time 表示恢复的时间。
  • TypeSavePoint,这是因为我们是手工重启的,会显示为 Savepoint,如果是任务故障时自动重启的,这里会显示为 checkpoint。具体 Savepoint 什么含义,后面我们会具体分析。
  • Path:这个表示启动任务时使用的 checkpoint 数据路径。

(4)此时在 socket 中模拟产生数据:
a
b

(5)查看任务日志显示如下信息,说明此任务在启动的时候恢复到了之前的状态,因为上一次任务停止之前,a 出现了 2 次,b 出现了 1 次。这次任务重启后,可以累加之前状态中记录的次数。这样就实现了任务故障后的数据恢复,可以保证流计算中数据的准确性,如果没有使用状态和 checkpoint,当任务重启后,所有的数据都会归 0
提示:当重启后的程序正常运行后,他还会按照 Checkpoint 的配置进行运行,继续生成 Checkpoint 数据。

5,自动恢复测试

(1)我们继续一下任务在运行期间,任务内部故障导致的任务自动重启时 checkpoint 数据的自动恢复。首先,此时在任务界面中查看一下 taskmanager 节点信息,可看到 taskmanager 进程目前运行在 node1 上。

(2)我们模拟一个故障场景,由于集群节点异常导致 node1 节点宕机了,那么运行在这个节点上的 taskmanager 进程肯定也就没了。在这里我到 node1 上使用 kill 命令直接把 taskmanager 进程杀掉就可以模拟这个场景了。
  • 首先在 node1 上执行 jps 命令查看目前的进程信息。

  • 确定了是哪个进程之后,使用 kill 命令杀掉进程。
kill -9 39781

(3)这个时候回到任务界面,可以看到界面中已经不显示这个 taskmanager 了:

(4)此时发现任务中有一些失败的 task

(5)稍等一会任务会自动重启成功:

(6)再看任务界面中的 checkpoint 相关的信息,可以发现此时下面的 restore 部分有信息了,表示是基于之前的第 365 份数据进行启动的,此时 type 就是 checkpoint 了。

(7)这个时候我们在 socket 中再模拟产生一条数据 a
a

(8)再验证一下输出结果,发现是没有问题的,是基于之前的状态进行累加的。
评论0