当前位置: > > > Flink - 并行度Parallelism详解(附:4种并行度设置方法)

Flink - 并行度Parallelism详解(附:4种并行度设置方法)

一、基本介绍

1,什么是 parallelism(并行度)?

(1)一个 Flink 程序由多个组件组成(SourceTransformationSink)。 一个组件由多个并行实例(线程)来执行, 一个组件的并行实例(线程)数目就被称为该组件的并行度。

(2)举两个例子
  • 比如 kafka 某个 topic 数据量太大,设置了 10 个分区,但 source 端的算子并行度却为 1,只有一个 subTask 去同时消费 10 个分区,明显很慢。此时需要适当的调大并行度。
  • 比如某个算子执行了比较复杂的操作,导致该算子执行特别慢,那么可以考虑给该算子增加并行度。

2,TaskManager 与 Slot

(1)Flink 的每个 TaskManager 为集群提供 SoltSolt 的数量通常与每个 TaskManager 节点的可用 CPU 数量成比例,一般情况下 Slot 的数量就是每个节点的可用 CPU 数量。

(2)slotTaskManager 资源的最小单元。比如 TaskManager5slot,那么每个 slot 分配 25% 的内存,所有 slot 共享 TaskManagercpu。在一个 slot 中可以运行一个或者多个线程。

(3)当然,每个 slot 里并不是只能跑一个算子的一个子任务。实际上,一个 slot 可以跑同一个 job 里面,不同算子的不同子任务。以下图为例:
  • 图中共有 2TaskManager6slot
  • Sourcemap 算子组成了任务链,并行度是 2,跑在了 2slot 中。
  • keyBy()/window()/apply 算子组成了任务链,并行度也是 2,也跑在了 2slot 中。
  • sink 的并行度 是 1,跑在 1slot 中。
    source/map 要和 keyBy 算子分开,他们不能是一个任务链原因在于:keyBy 相当于是分区,得把数据分到不同的算子上,当然不能在一个任务链里面了。

(4)不过上面这样分配的是很不合理的,可能 source/map 算子的任务很轻,分分钟就跑完了,然后 cpu 在那闲着。但是 keyBy/window/apply 算子一直在忙着计算,资源很紧张。事实上,任务可以向下面的图这样分配:
  • source/map 算子 和 keyBy/window/applysink 算子共享了一个 slot 资源。他们的并行度都是 6
  • 所以, flink 任务,最大并行度的那个算子,决定了需要多少个 slot 。把消耗并行度最大的那个算子解决了,其他算子也都没问题。

3,并行度案例分析

(1)下图中一共有 3TaskManager,每个 TaskManager 3slot,此时一共有 9slot

(2)当所有的算子并行度为 1,只需要 1slot 就能解决问题,有 8 个处于空闲。

(3)如果并行度设置为 2,则使用了 2slot

(4)如果设置并行度为 9,所有的 slot 都用到了。


二、设置并行度的方法

1,基本介绍

(1)Flink 任务的并行度可以通过 4 个层面来设置。
  • Operator Level(算子层面)
  • Execution Environment Level(执行环境层面)
  • Client Level(客户端层面)
  • System Level(系统层面)
(2)这些并行度优先级为 Operator Level > Execution Environment Level > Client Level > System Level

2,算子层面设置

算子、数据源和目的地的并行度可以通过调用 setParallelism() 方法来指定。
env.addSource(...)
.map(...).setParallelism(5)
.keyBy(...)
.addSink(...).setParallelism(1)

3,执行环境层面设置

    执行环境层面的并行度可以通过调用 env 变量的 setParallelism() 方法指定。这样设置的并行度是程序中每个算子的并行度,如果算子没有单独覆盖的话,那就是默认是这个全局的并行度了。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setParallelism(10);

4,客户端层面设置

并行度还可以在客户端提交 Job 时设定。通过 -p 参数指定并行度。
bin/flink run -p 10 FlinkDemo.jar

5,系统层面设置

    在 Flinkconf 目录里的 config.yaml 里有一个 parallelism.default 配置属性来指定所有执行环境的默认并行度,默认并行度为 1。我们可以根据需求进行修改。
注意:如果是老版本的 Flink,则是在配置文件 flink-conf.yml 种。
评论0