Flink - Checkpoint使用详解1(工作机制、端到端一致性实现原理)
Flink 是一个分布式的流处理引擎,而流处理的其中一个特点就是 7X24。为了保障 Flink 作业的持续运行。Flink 的内部会将应用状态(state)存储到本地内存或者嵌入式的 kv 数据库(RocksDB)中,由于采用的是分布式架构,Flink 需要对本地生成的状态进行持久化存储,以避免因应用或者节点机器故障等原因导致数据的丢失,Flink 是通过 checkpoint(检查点)的方式将状态写入到远程的持久化存储,从而就可以实现不同语义的结果保障。
一、基本介绍
1,什么是 Checkpoint?
(1)Checkpoint 是 Flink 实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的 State 来生成快照,从而将这些 State 数据定期持久化存储下来(比如 HDFS)。
(2)当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
2,检查点的生成逻辑
(1)我们以下图说明检查点的生成:
- 左边的输入流是用户行为数据,包括购买(buy)和加入购物车(cart)两种,每种行为数据都有一个偏移量,统计每种行为的个数。
- 中间的 [cart,3] 表示目前消费者消费到的那条数据及对应的偏移量,这个信息会存储在基于内存的状态中。
- 右边的 [count(buy), 1]、[count(cart), 3],这些是实时汇总的结果,这些数据也会存储在基于内存的状态中。
- Flink 触发执行 Checkpoint 之后会把内存中存储的状态数据写入到下面的持久化存储中。

(2)checkpoint 的执行流程如下:
- ①:当消费到 [cart,3] 这条数据时,正好达到了 checkpoint 的执行时机,此时 JobManager 中的 checkpoint coordinator 会触发 checkpoint 开始执行。 此时状态中存储的消费偏移量是 4
- ②:checkpoint 真正开始执行的时候,他会先把状态中维护的消费偏移量写入到持久化存储中。
- ③:写入结束后,DataSource 组件会把状态的存储路径信息反馈给 JobManager 中的 checkpoint coordinator。
- ④、⑤、⑥、⑦:接着后面算子中的状态数据:[count(buy), 1]、[count(cart), 3] 也会进行同样的步骤
- ⑧:等所有的算子都完成了状态数据的持久化存储,也就是说 checkpoint coordinator 收集到了所有任务反馈给他的状态存储路径,这个时候就认为这一次的 checkpoint 真正执行成功了,最后他会向持久化存储中再备份一个 checkpoint metadata 元数据文件,那么本次整个 checkpoint 流程就完成了。如果中间有任何一个阶段不成功,那么本次 checkpoint 就宣告失败。,如果中间有一个不成功,那么本次 checkpoin 就宣告失败。
- 当达到下一次 checkpoint 执行时机的时候,会继续重复前面的执行流程。
3,检查点的恢复逻辑
(1)继续接着前面的业务流程,前面我们在消费完第 4 条数据的时候触发了一次 checkpoint。checkpoint 执行结束后,紧接着消费者开始消费第 5 条数据,当把第 5 条数据 buy 消费出来之后,在计算的时候由于资源问题导致出现了故障,此时任务异常结束了。

(2)任务结束后,Flink 尝试重启任务,并恢复数据到之前的状态。在最开始重启任务的时候,任务中基于内存的状态都是空的。

(3)当任务重新启动之后,会根据指定的快照数据进行恢复,此时上一次在快照时保存的偏移量 3、[count(buy), 13]、[count(cart), 3] 这些数据对应的都恢复到了正确的位置。

(4)恢复成功之后,任务会基于之前的偏移量 3 继续往后面消费,所以又把 [buy,4] 这条数据消费出来了。此时算子中计算的结果,count(buy)就变成了 2。这就是正常的数据处理流程了。

4,检查点分界线(Checkpoint Barrier)
(1)Flink 分布式快照的一个核心元素是 Checkpoint Barrier。这些 barrier 会被注入到数据流中,并作为数据流的一部分与记录一起流动。
- 当 Flink 作业设置了检查点时,Flink 会在数据流中插入这些特殊记录,以确保在特定点上所有算子的状态都被一致地保存。
- barrier 永远不会超过记录,它们严格地按顺序流动。
- barrier 将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集,相当于将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。
- 每个 barrier 都携带着包含了在它前面的记录的快照的 ID。
- barrier 不会中断数据流,因此非常轻巧。
- 来自不同快照的多个 barrier 可以同时在数据流中,这意味着多种快照可能并发发生。
- 整个过程是由 Flink 的执行引擎在运行时负责处理的,通过协调不同操作符之间的信号和状态来实现数据流中的 checkpoint barrier 插入。
(2)如下图所示,Checkpoint Barrier 被插入到数据流中,它将数据流切分成段。Flink 的 Checkpoint 逻辑是,一段新数据流入导致状态发生了变化,Flink 的算子接收到 Checpoint Barrier 后,对状态进行快照。每个 Checkpoint Barrier 有一个 ID,表示该段数据属于哪次 Checkpoint。如图所示,当 ID 为 n 的 Checkpoint Barrier 到达每个算子后,表示要对 n-1 和 n 之间状态的更新做快照。Checkpoint Barrier 有点像 Event Time 中的 Watermark,它被插入到数据流中,但并不影响数据流原有的处理顺序。

5,Exactly-Once(精确一次)的实现原理
(1)Flink 提供了精确一次的处理语义,精确一次的处理语义可以理解为:数据可能会重复计算,但是结果状态只有一个。Flink 通过 Checkpoint 机制实现了精确一次的处理语义:
- Flink 在触发 Checkpoint 时会向 Source 端插入 checkpoint barrier,checkpoint barriers 是从 source 端插入的,并且会向下游算子进行传递。
- checkpoint barriers 携带一个 checkpoint ID,用于标识属于哪一个 checkpoint,checkpoint barriers 将流逻辑是哪个分为了两部分。
- 对于双流的情况,通过 barrier 对齐的方式实现精确一次的处理语义。
(2)下面对 checkpoint 过程进行分解:
- 图 1 包括两个流,每个任务都会消费一条用户行为数据(包括购买(buy)和加购(cart)),数字代表该数据的偏移量,count buy 任务统计购买行为的个数,coun cart 统计加购行为的个数。

- 图 2,触发 checkpoint,JobManager 会向每个数据源发送一个新的 checkpoint 编号,以此来启动检查点生成流程。

- 图 3,当 Source 任务收到消息后,会停止发出数据,然后利用状态后端触发生成本地状态检查点,并把该 checkpoint barrier 以及 checkpoint id 广播至所有传出的数据流分区。状态后端会在 checkpoint 完成之后通知任务,随后任务会向 Job Manager 发送确认消息。在将 checkpoint barrier 发出之后,Source 任务恢复正常工作。

- 图 4,Source 任务发出的 checkpoint barrier 会发送到与之相连的下游算子任务,当任务收到一个新的 checkpoint barrier 时,会继续等待其他输入分区的 checkpoint barrier 到来,这个过程称之为 barrier 对齐,checkpoint barrier 到来之前会把到来的数据线缓存起来。

- 图 5,任务收齐了全部输入分区的 checkpoint barrier 之后,会通知状态后端开始生成 checkpoint,同时会把 checkpoint barrier 广播至下游算子。

- 图 6,任务在发出 checkpoint barrier 之后,开始处理因 barrier 对齐产生的缓存数据,在缓存的数据处理完之后,就会继续处理输入流数据。

- 图 7,最终 checkpoint barrier 会被传送到 sink 端,sink 任务接收到 checkpoint barrier 之后,会向其他算子任务一样,将自身的状态写入 checkpoint,之后向 Job Manager 发送确认消息。Job Manager 接收到所有任务返回的确认消息之后,就会将此次检查点标记为完成。

附、Kafka+Flink+Kafka 实现端到端一致性
前面分析了 Flink 任务如何实现端到端的一致性,下面以 Kafka + Flink + Kafka 这个工作中常见的应用场景进行分析。
- Source 端:使用 Kafka Consumer,将消费偏移量作为状态保存
- 系统内部:依赖于 Checkpoint 机制实现
- Sink 端:使用 Kafka Producer,采用两阶段提交实现事务写入
这样,整个 Kafka + Flink + Kafka 这个流程就可以实现端到端的一致性。下面我将详细分析一下整个流程的实现。
1,第一步
启动初始化任务。在这里先介绍一下任务整个链条的基本情况。
- 数据源是 kafka,这个任务会从 kafka 的一个 topic 里面获取数据。
- 这个 topic 有 2 个 partition,每个 partition 都含有 “A”, “B”, “C”, ”D”, “E” 5 条消息。
- 任务第一次启动的时候,针对这 2 个 partition 的初始消费偏移量(offset)都是 0。
- 针对这 2 个 partition,对应产生了 2 个消费者,Source-1 和 Source-2。

2,第二步
消费者开始从 partition 0 读取数据,数据 A 被读取出来,此时第一个消费者的 offset 变成了 1。

3,第三步
(1)此时,数据 A 到达了 Map 算子中。
(2)接着两个消费者都开始读取他们下一条消息(partition 0 对应的消费者读取了消息 B,partition 1 对应的消费者读取了消息 A)。各自将消费者的 offset 更新成 2 和 1 。
(3)此时,正好达到了触发 Checkpoint 的时机

4,第四步
(1)在触发 Checkpoint 的时候,JobManager 中的 Checkpoint Coordinator 会在 Source 数据源中插入 Barrier 标记。
- 由于有 2 份数据流,每个数据流中都会插入一个 Barrier 标记,此时 Barrier 标记中的 checkpoint id 可以认为是 1。
- 针对 partiton 0 这个数据流,当消费者将数据 B 消费出来之后,会在数据 B 后面插入一个 Barrier 标记,对应的 checkpoint id 为 1。
- 针对 partition 1 这个数据流,当消费者将数据 A 消费出来之后,会在数据 A 后面插入一个 Barrier 标记,对应的 checkpoint id 也是 1。
- 具体 Barrier 标记会插入到哪个位置,要看触发 checkpoint 的时候消费到了哪条数据,Checkpoint Coordinator 就会将 Barrier 标记插入到触发 checkpoint 时消费的那条数据后面。
(2)Source 组件被触发了 Checkpoint 之后,Kafka 消费者开始将它的状态生成快照,保存到持久化存储中,状态中维护的主要是分区以及消费偏移量信息。对应的就是 <Partition0,2> 和 <Partition1,1> 这种数据,
注意:在这里其实省略了 topic 的名称,状态中也会保存 topic 名称的。
(4)随着数据的继续流动
- 此时 partition 0 对应的消费者消费出来的数据 B 已经到达了 Map 算子中,它后面的 Barrier 标记也进入到 Map 算子中,所以它又将数据 C 消费出来了,数据 C 此时即将到达 Map 算子中,之前收到的数据 A 已经处理过并且发送到下游了。
- partition 1 对应的消费者消费出来的数据 A 即将到达 Map 算子中
注意:此时 map 算子暂时不会触发 checkpoint 流程,它需要在接收到所有数据流中的 Barrier 标记之后才会执行,这样才可以保证数据一致性。

5,第五步
(1)当 Map 算子收到了所有数据流中的 Barrier 标记之后,就实现了 Barrier 对齐,这样就可以触发 Map 算子的 Checkpoint 了,它会将自己维护的状态生成快照存储到下面这个持久化存储中。
- 当然了,Map 算子中也不一定必须要维护状态,这个要根据具体的业务需求,有的业务需求在 map 算子中也不需要维护状态。
(2)此时消费者会继续消费数据,partition 0 对应的消费者将数据 D 消费出来。partition 1 对应的消费者将数据 B 消费出来。
注意:
- 当 Map 算子收到 partition 0 对应的那个数据流中的 Barrier 标记之后,partition 0 对应的消费者还会继续消费后面的数据,此时数据 C 已经达到了 Map 算子中,但是数据 C 不会被立刻计算,他会被缓存起来。
- 当所有的 Barrier 标记到齐之后,触发了 Checkpoint,Map 算子才会去计算这些缓存起来的数据,这些缓存的数据计算完以后再计算新消费过来的数据。因为 Barrier 标记后面的数据属于下一个 Ckeckpoint。

6,第六步
(1)接下来 Barrier 标记会继续向下游流动。
(2)同时 Sink 组件,其实就是 Kafka 生产者(Kafka Producer),它会将自己收到的数据写入目的地 Kakfa 中。
- 但是注意,此时这些数据属于预提交的事务(这些数据暂时无法被消费,可以认为是临时写入 kafka 中,目前这些数据对外是不可用的)。

7,第七步
当 Sink 组件收到 Barrier 标记时,也会触发 Sink 组件的 checkpoint 流程,它会将它里面的状态数据生成快照,保存到持久化存储中。
注意:如果 Map 算子产生了多个子任务,Sink 组件也需要收到所有 Map 算子子任务的 Barrier 标记之后才会执行 checkpoint 流程。

8,第八步
(1)当所有算子任务的快照都执行完成了,此时也就意味着这一次 Checkpoint 完成了。
(2)JobManager 中的 Checkpoint Coordinator 会向所有子任务发送通知,告诉他们这次 checkpoint 成功完成了。
(3)当 Sink 组件收到这个通知之后,就会执行二次提交,正式提交之前的事务,之前预提交到 Kafka 中的数据就可以被正常消费使用了。这就是 Flink 和 Kafka 实现端到端一致性的整体流程。
