Spark - RDD使用详解7(血缘关系、依赖关系、阶段划分、任务划分)
九、RDD 的血缘关系、依赖关系、阶段划分、任务划分
1,血缘关系
(1)RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。当一个 RDD 通过一系列的转换操作生成新的 RDD 时,新的 RDD 会记录下它依赖的原始 RDD 以及转换操作的步骤,这样就形成了 RDD 之间的血缘关系。当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
(2)使用 RDD 的 toDebugString() 方法可以查看 RDD 的血缘关系(Lineage)以及转换操作的详细信息。该方法返回一个包含 RDD 血缘关系的字符串表示。
(2)使用 RDD 的 toDebugString() 方法可以查看 RDD 的血缘关系(Lineage)以及转换操作的详细信息。该方法返回一个包含 RDD 血缘关系的字符串表示。
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com")) println(listRDD.toDebugString) println("----------------------") val wordRDD: RDD[String] = listRDD.flatMap(_.split(" ")) println(wordRDD.toDebugString) println("----------------------") val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1)) println(mapRDD.toDebugString) println("----------------------") val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_) println(resultRDD.toDebugString) resultRDD.collect()
2,依赖关系
(1)依赖关系就是两个相邻 RDD 之间的关系。RDD 的依赖关系分为两种类型:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。
- 窄依赖(Narrow Dependency):指父 RDD 的每个分区只被子 RDD 的一个分区所使用,例如 map、filter 等这些算子。
- 宽依赖(Shuffle Dependency):父 RDD 的每个分区都可能被子 RDD 的多个分区使用,例如 groupByKey、reduceByKey,sortBykey 等算子,这些算子其实都会产生 shuffle 操作。
(2)比如下面样例中:
- wordRDD 对 listRDD、mapRDD 对 wordRDD 就是窄依赖关系(无需进行数据的 Shuffle 洗牌操作)
- resultRDD 对 mapRDD 则是宽依赖关系(需进行数据的 Shuffle 洗牌操作)
val listRDD = sc.makeRDD(List("welcome to hangge.com", "hello world", "hangge.com")) println(listRDD.dependencies) println("----------------------") val wordRDD: RDD[String] = listRDD.flatMap(_.split(" ")) println(wordRDD.dependencies) println("----------------------") val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1)) println(mapRDD.dependencies) println("----------------------") val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_) println(resultRDD.dependencies) resultRDD.collect()
(3)窄依赖(Narrow Dependency)指的是一个父 RDD 的每个分区最多只被一个子 RDD 的分区所依赖。在这种情况下,Spark 可以将父 RDD 的每个分区直接映射到子 RDD 的分区上,而无需进行数据的洗牌操作(Shuffle)。窄依赖的转换操作包括 map、filter、union 等。
- 下面示例中,rdd2 是通过对 rdd1 应用 map 转换操作而得到的,转换操作是在每个分区上独立执行的,没有涉及数据的洗牌操作,因此 rdd2 对 rdd1 有窄依赖关系。
// 创建RDD val rdd1 = sc.makeRDD(List(1,2,3,4,5)) // 对RDD进行转换操作 val rdd2 = rdd1.map(_*2)
(4)宽依赖(Wide Dependency)指的是一个父 RDD 的分区被多个子 RDD 的分区所依赖。在这种情况下,Spark 需要进行数据的洗牌操作(Shuffle),将父 RDD 的分区数据重新分发到子 RDD 的分区上。宽依赖的转换操作包括 groupByKey、reduceByKey 等需要数据重组的操作。
- 下面示例中,rdd2 是通过对 rdd1 应用 groupByKey 转换操作而得到的,groupByKey 需要将相同键的数据进行分组,因此需要进行数据的洗牌操作,rdd2 对 rdd1 有宽依赖关系。
// 创建RDD val rdd1 = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(1,"d"),(1,"e"),(2,"f"))) // 对RDD进行转换操作 val rdd2 = rdd1.groupByKey()
3,RDD 阶段划分
(1)RDD 的阶段划分是指将 RDD 的计算流程划分为不同的阶段,每个阶段包含了一组可以并行执行的转换操作。划分阶段的目的是为了减少数据的传输和 shuffle 操作,从而提高计算效率。具体的划分过程如下:
- RDD 的转换操作通常按照依赖关系被划分为不同的阶段。如果一个 RDD 的依赖关系链上存在宽依赖(即父 RDD 的分区和子 RDD 的分区之间存在多对多的依赖关系),那么该 RDD 的计算过程会被划分为多个阶段。
- 划分阶段时,Spark 会根据宽依赖的划分策略将父 RDD 的每个分区映射到子 RDD 的每个分区,以形成多对多的依赖关系。
- RDD 的划分过程还会考虑数据本地性,尽量将具有相同位置的分区放在同一个阶段中,以减少数据的网络传输。
(2)Stage 的划分规则:从后往前,遇到宽依赖就划分 Stage。这是因为 RDD 之间是有血缘关系的,后面的 RDD 依赖前面的 RDD,也就是说后面的 RDD 要等前面的 RDD 执行完,才会执行。所以从后往前遇到宽依赖就划分为两个 stage,shuffle 前一个,shuffle 后一个。如果整个过程没有产生 shuffle 那就只会有一个 stage。以下图为例:
- RDD G 往前推,到 RDD B 的时候,是窄依赖,所以不切分 Stage,再往前到 RDD A,此时产生了宽依赖,所以 RDD A 属于一个 Stage、RDD B 和 G 属于一个 Stage
- 再看下面,RDD G 到 RDD F,产生了宽依赖,所以 RDD F 属于一个 Stage,因为 RDD F 和 RDD C、D、E 这几个 RDD 没有产生宽依赖,都是窄依赖,所以他们属于一个 Stage。
- 所以这个图中,RDD A 单独一个 stage1,RDD C、D、E、F 被划分在 stage2 中,最后 RDD B 和 RDD G 划分在了 stage3 里面。
注意:Stage 划分是从后往前划分,但是 stage 执行时从前往后的,这就是为什么后面先切割的 stage 为什么编号是 3。
(3)DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
4,RDD 任务划分
(1)RDD 任务划分可以细分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job:一个 Action 算子就会生成一个 Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意:Application -> Job -> Stage -> Task 每一层都是 1 对 n 的关系。
(2)下面是一个任务划分的图示: