Flink - 并行度Parallelism详解(附:4种并行度设置方法)
一、基本介绍
1,什么是 parallelism(并行度)?
(1)一个 Flink 程序由多个组件组成(Source、Transformation 和 Sink)。 一个组件由多个并行实例(线程)来执行, 一个组件的并行实例(线程)数目就被称为该组件的并行度。
(2)举两个例子
- 比如 kafka 某个 topic 数据量太大,设置了 10 个分区,但 source 端的算子并行度却为 1,只有一个 subTask 去同时消费 10 个分区,明显很慢。此时需要适当的调大并行度。
- 比如某个算子执行了比较复杂的操作,导致该算子执行特别慢,那么可以考虑给该算子增加并行度。
2,TaskManager 与 Slot
(1)Flink 的每个 TaskManager 为集群提供 Solt, Solt 的数量通常与每个 TaskManager 节点的可用 CPU 数量成比例,一般情况下 Slot 的数量就是每个节点的可用 CPU 数量。
(2)slot 是 TaskManager 资源的最小单元。比如 TaskManager 有 5 个 slot,那么每个 slot 分配 25% 的内存,所有 slot 共享 TaskManager 的 cpu。在一个 slot 中可以运行一个或者多个线程。
(3)当然,每个 slot 里并不是只能跑一个算子的一个子任务。实际上,一个 slot 可以跑同一个 job 里面,不同算子的不同子任务。以下图为例:
- 图中共有 2 个 TaskManager,6 个 slot。
- Source 和 map 算子组成了任务链,并行度是 2,跑在了 2 个 slot 中。
- keyBy()/window()/apply 算子组成了任务链,并行度也是 2,也跑在了 2 个 slot 中。
- sink 的并行度 是 1,跑在 1 个 slot 中。
source/map 要和 keyBy 算子分开,他们不能是一个任务链原因在于:keyBy 相当于是分区,得把数据分到不同的算子上,当然不能在一个任务链里面了。

(4)不过上面这样分配的是很不合理的,可能 source/map 算子的任务很轻,分分钟就跑完了,然后 cpu 在那闲着。但是 keyBy/window/apply 算子一直在忙着计算,资源很紧张。事实上,任务可以向下面的图这样分配:
- source/map 算子 和 keyBy/window/apply 和 sink 算子共享了一个 slot 资源。他们的并行度都是 6。
- 所以, flink 任务,最大并行度的那个算子,决定了需要多少个 slot 。把消耗并行度最大的那个算子解决了,其他算子也都没问题。

3,并行度案例分析
(1)下图中一共有 3 个 TaskManager,每个 TaskManager 3 个 slot,此时一共有 9 个 slot。

(2)当所有的算子并行度为 1,只需要 1 个 slot 就能解决问题,有 8 个处于空闲。

(3)如果并行度设置为 2,则使用了 2 个 slot。

(4)如果设置并行度为 9,所有的 slot 都用到了。


二、设置并行度的方法
1,基本介绍
(1)Flink 任务的并行度可以通过 4 个层面来设置。
- Operator Level(算子层面)
- Execution Environment Level(执行环境层面)
- Client Level(客户端层面)
- System Level(系统层面)
(2)这些并行度优先级为 Operator Level > Execution Environment Level > Client Level > System Level
2,算子层面设置
算子、数据源和目的地的并行度可以通过调用 setParallelism() 方法来指定。
env.addSource(...) .map(...).setParallelism(5) .keyBy(...) .addSink(...).setParallelism(1)
3,执行环境层面设置
执行环境层面的并行度可以通过调用 env 变量的 setParallelism() 方法指定。这样设置的并行度是程序中每个算子的并行度,如果算子没有单独覆盖的话,那就是默认是这个全局的并行度了。
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setParallelism(10);
4,客户端层面设置
并行度还可以在客户端提交 Job 时设定。通过 -p 参数指定并行度。
bin/flink run -p 10 FlinkDemo.jar
5,系统层面设置
在 Flink 的 conf 目录里的 config.yaml 里有一个 parallelism.default 配置属性来指定所有执行环境的默认并行度,默认并行度为 1。我们可以根据需求进行修改。
注意:如果是老版本的 Flink,则是在配置文件 flink-conf.yml 种。
