当前位置: > > > Flink - State状态详解1(基本概念、Keyed State与Operator State)

Flink - State状态详解1(基本概念、Keyed State与Operator State)

一、基本介绍

1,什么是 State?

(1)在流式计算中,State(状态)用于保存任务的中间数据或上下文信息。状态可以帮助任务跟踪事件间的依赖关系,从而实现复杂的业务逻辑,例如聚合、窗口计算或故障恢复。

(2)下图里面显示了 Flink 在对金融数据实现实时累加求和时的业务场景,source 组件对接的是 kafka 中的数据,中间通过算子实现数据累加求和,将聚合后的结果数据存储到 State 中,最终通过 sink 组件将聚合后的结果写出去。

2,Flink 中状态的分类

(2)Keyed State(键控状态)
  • 定义:Keyed State 只能在 KeyedStream 中使用,并且与特定的 key 绑定。
  • 特点:
    • 每个 key 都拥有独立的状态空间。
    • 使用场景:窗口操作、计数器、聚合操作。
  • 常用 API
    • ValueState:存储单个值。
    • ListState:存储一个列表。
    • MapState:存储键值对。
    • ReducingState:使用聚合函数对状态进行合并。
    • AggregatingState:通过自定义聚合逻辑对状态进行处理。

(2)Operator State(算子状态)
  • 定义:Operator State 属于算子范围内的状态,所有任务实例共享状态。
  • 特点:
    • 不依赖 key,可以存储与流无关的元数据信息。
    • 使用场景:偏移量存储、算子初始化、分区分配。
  • 常用 API
    • ListState:存储多个值的列表。

3,状态后端(State Backend)

(1)Flink 的状态管理依赖于状态后端(State Backend)。状态后端负责存储和管理状态数据,并提供检查点(Checkpoint)功能以实现容错。

(2)常见状态后端:
  • MemoryStateBackend:将状态存储在任务管理器的内存中。适用于状态较小的场景。
  • FsStateBackend:将状态存储在文件系统中。性能较高,适用于中等规模的状态。
  • RocksDBStateBackend:将状态存储在 RocksDB 中,支持超大规模状态。适用于需要持久化存储的大规模状态场景。

二、Keyed State 详解

1,基本介绍

(1)Keyed State 是基于 KeyedStream 上的状态,在普通数据流后面调用 keyBy 之后可以获取到一个 KeyedStream 数据流。此时状态是和特定的 key 绑定的。针对 KeyedStream 上的每个 KeyFlink 都会维护一个状态实例。
注意:无论是 Keyed State 还是 Operator StateFlink 的状态都是基于本地的,也就是说每个算子子任务维护着这个子任务对应的状态存储,子任务之间的状态不支持互相访问。

(2)我们可以通过下图进行理解:
  • 图中左边的 Source 表示是数据源,这个组件的并行度为 2,所以 Source 产生了 2task
  • 右边的 Stateful 表示是有状态的算子,这个算子的并行度也是 2,所以也产生了 2task
  • 假如数据源按照 ID 这个列作为 Key 进行了 keyBy 分组,形成了一个 KeyedStream 数据流,其中 ID 的值为 A\B\C\D 这种英文字母。
  • 此时这个数据流中所有 IDAkey 共享一个状态,可以访问和更新这个状态,以此类推,每个 Key 对应一个自己的状态。
  • 在这个图里面 Stateful 1 这个 task 实例维护了 A\B\Y 这些 key 的状态数据。Stateful 2 这个 task 实例维护了 D\E\Z 这些 key 的状态数据。
  • 在这个图里面,Stateful 1Stateful 2 这两个子任务虽然都属于同一个算子,但是他们是 2 个独立的子任务,所以这 2 个子任务之间的状态数据也是不支持互相访问的。

2,Keyed State 中支持的数据结构

(1)ValueState
  • 存储类型为 T 的单值状态,T 是一个泛型。这个状态与对应的 key 绑定,是最简单的状态了。
  • 它里面可以存储任意类型的值,这个值也可以是一个复杂数据结构。它可以通过 update 方法更新状态的值,通过 value() 方法获取状态值。

(2)ListState
  • 表示是一个列表状态,列表里面存储多个类型为 T 的元素。
  • 可以通过 add 方法往列表中添加数据;也可以通过 get() 方法返回一个 Iterable 列表来遍历状态数据。

(3)ReducingState
  • 存储一个聚合后类型为 T 的单值状态,这种状态通过用户传入的 reduceFunction,每次调用 add 方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值,有点类似于 ValueState,都是存储单个数值的。

(4)AggregatingState<IN ,OUT>
  • 存储一个聚合后类型为 OUT 的单值状态,它和 ReducingState 的区别是:
    • ReducingState 在聚合时接收的数据类型和最终产生的聚合结果数据类型是一致的。
    • 但是 AggregatingState 在聚合时接收的数据类型和最终产生的聚合结果数据类型可以不一样,AggregatingState 里面使用了两个泛型,IN 代表聚合时传入的数据类型,OUT 表示最终产生的结果数据类型。

(5)MapState<UK, UV>
  • 可以存储 key-value 类型的多个元素,keyvalue 可以是任何类型。用户可以通过 putputAll 方法添加元素。

三、Operator State 详解

1,基本介绍

(1)Operator State 表示是和算子绑定的状态,与 Key 无关。所以说 Operator State 可以应用在任何类型的数据流上。此时算子的同一个子任务共享一个状态实例,流入这个算子子任务的数据可以访问和更新这个状态实例。
提示:在实际工作中 Operator State 的实际应用场景不如 Keyed State 多。Operator State 经常被用在 SourceSink 组件中,用来保存流入数据的偏移量或者对输出的数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。

(2)我们可以通过下图进行理解:
  • 图中左边的 Source 表示是数据源,这个组件的并行度为 2,会产生 2task
  • 右边的 Stateful 这个有状态的算子的并行度也是 2,对应也会产生 2task
  • 此时 Source-1 的数据都会进入到 Stateful -1 这个子任务中,Stateful -1 会维护一个状态实例,他接收到的 A\B\Y 这几个数据会存储到同一个状态实例中,A\B\Y 这些数据会共享同一个状态实例。
  • 对应的 Source-2 的数据都会进入到 Stateful -2 这个子任务中,Stateful -2 会维护一个状态实例,他接收到的 D\E\Z 这几个数据会存储到同一个状态实例中。D\E\Z 这些数据会共享同一个状态实例。

(3)Operator State 有一个典型的应用场景是 FlinkKafka 中消费数据,这个时候会用到 FlinkKafkaConsumerBase 这个接口。我们知道,针对 kafkaDataSource 可以提供仅一次语句,想要提供仅一次语句,那么这个 DataSource 中就需要维护状态了,通过状态来维护消费偏移量信息。FlinkKafkaConsumerBase 中实际上会维护消费者消费的 topic 名称、分区编号和 offset 偏移量这些信息,这些数据会使用 Operator State 类型的状态进行存储,即 Operator State 中的 UnionListState 这种状态,其实就是一个基于 List 列表的状态,类似下图显示的这样。

2,Operator State 中支持的数据结构

(1)ListState
  • 表示是一个列表类型的状态,存储类型为 T 的多个元素,T 是一个泛型。

(2)UnionListState
  • UnionListState 底层就是 ListState
  • ListStateUnionListState 的区别在于任务故障后恢复数据时:
    • ListState 是将整个状态列表按照负载均衡算法均匀分布到各个算子子任务上,每个算子子任务得到的是整个列表的子集;
    • UnionListState 会按照广播的方式,将整个列表发送给每个算子子任务。

(3)BroadcastState
  • 主要存储 K-V 类型的多个元素,存储的数据格式和 MapState 一样,但是它在恢复数据的时候会广播发送数据。
  • Broadcast State 属于 Operator State 的一种特殊类型,主要是为了实现同一个算子的多个子任务共享一个 State

附:Keyed State 与 Operator State 对比

1,从 State 使用场景这个角度上进行分析

(1)针对 Keyed State,它只能应用在基于 KeyedStream 的数据流中。
  • Flink 中,普通的数据流是 DataStream,在 DataStream 后面调用 keyBy 算子之后,返回的就是 KeyedStream 数据流,那也就是说 Keyed State 只能应用在做过 keyBy 之后的数据流里面。
(2)而 Operator State 可以应用在所有数据流中,包括 keyedStream

2,从 State 分配方式这个角度上进行分析

(1)针对 Keyed State,因为是基于 key 进行分组的数据流,相同 key 的数据会进入到同一个子任务中被处理,此时每个相同的 Key 共享一个 State 实例。
(2)针对 Operator State,需要兼容所有类型的数据流,所以此时算子的同一个子任务共享一个 State 实例,和 key 无关。

3,从 State 创建方式这个角度上进行分析

(1)针对 Keyed State,需要借助于 getRuntimeContext 这个对象来创建。
(2)针对 Operator State,需要借助于 context 这个对象来创建。

4,从 State 扩缩容模式这个角度上进行分析

(1)通俗一点来说,其实就是在任务故障后恢复的时候,算子的并行度发生了变化,可能增加了并行度,或者减少了并行度。
  • 针对无状态的算子,扩缩容很容易,没有什么影响。
  • 但是针对有状态的算子,并行度发生改变之后,状态在恢复的时候会涉及到重新分组,需要将状态数据分配到和之前数量不相等的算子任务中。
(2)针对 Keyed State,它会以 KeyGroup 为单位重新分配状态数据,KeyGroup 其实就是包含了多个 key 的一个分组。
  • 基于 Keyed State 类型状态的算子在扩缩容时会根据新的算子并行度数量对状态重新分配,不过为了降低状态数据在不同任务之间的迁移成本,Flink 对这些状态做了分组,会按照所有状态的 key 进行分组,划分成多个 keyGroup,每个 keyGroup 内部包含一部分 key 的状态,以 keyGroup 为单位重新分配状态数据。

(3)针对 Operator State,它会均匀分配状态数据,或者是广播分配,具体要看你使用的是哪种数据类型了。
  • 如果算子中使用了 Operator State 类型中的 ListState 这种状态,那么算子在扩缩容时会对 ListState 中的数据重新分配。
    • 如下图所示,这个算子的所有并行运行的 task 中的 ListState 数据会被统一收集起来,然后均匀分配给更多的 task 或者更少的 task

  • UnionListState 底层其实就是 ListState,唯一的区别就是在扩缩容时状态数据的分配策略不一样。UnionListState 会在扩缩容时把里面的所有状态数据全部广播发送给新任务。
    • 所以针对 UnionListState 这种方式,任务重启恢复状态数据的时候,每个子任务都会收到所有的数据,但是这个子任务可以根据一定的策略选择操作部分状态数据。

  • BroadcastState 在扩缩容时会把状态广播发送给所有的新任务。这种方式和 UnionListState 区别在于:
    • 针对 UnionListState 这种方式,假设算子 A 的并行度为 2,那么会产生 2task,这 2task 中维护的状态数据是不一样的,当任务重启之后,如果并行度发生了变化,那么算子 A 的每个子任务都可以接收到之前 2task 中维护的状态数据。
    • 针对 BroadcastState 这种方式,假设算子 A 的并行度为 2,那么这 2task 中的数据是完全一样的,当任务重启之后,如果并行度增加了,只需要基于某一个 task 中的状态数据复制到新的 task 中即可。如果任务重启后并行度减少了,只需要简单的去掉多余的 task 即可。
评论0