当前位置: > > > Spark 3.x新特性 - 动态分区裁剪详解

Spark 3.x新特性 - 动态分区裁剪详解

1,动态分区裁剪介绍

(1)当我们针对多个表进行 Join 的时候,动态分区裁剪功能会基于运行时(runtime)推断出来的信息,当 on 后面的查询条件满足一定要求后就会自动对表中的数据进行裁剪(过滤),减少 Join 时参与的数据量,进而提高效率。
提示:通俗来说,动态分区裁剪的核心思想是想办法提前跳过查询结果中不需要的数据,减少 Join 时的数据量,提高效率。

(2)当我们在执如下 Spark SQL 语句的时候,动态分区裁剪的具体执行流程如下下图所示:
ELECT t1.id, t2.key FROM t1 JOIN t2 ON t1.key = t2.key AND t2.id <2

(3)首看左边的这个图:它表示这个 Spark SQL 语句在没有使用动态分区裁剪情况下的执行情况。
  • t1 这个表可以认为是一个包含了多个分区的事实表。t2 这个表可以认为是一个数据量比较小的维度表。
  • Join 的时候 SQL 解析引擎会先对 t2 中的过滤条件做谓词下推优化,也就是说在 Join 之前,先执行 t2 中的过滤条件,减少 Join t2 的数据量。但是 t1 表的数据是不变的,相当于通过 scan 进行了全表扫描,根据全表扫描的结果和 t2 过滤后的结果进行 Join 关联。
  • 如果在 Join 关联的时候能够提前对 t1 表中的数据也进行过滤,这样是可以极大提高 Join 效率的。但是由于之前版本的 Spark 无法动态计算代价,所以会导致对 t1 表全表扫描,最终会扫描出大量的无效数据和 t2 表进行 Join 关联。

(4)接着看右边这个图:它表示 Spark SQL 语句在执行的时候使用了动态分区裁剪功能。
  • 这里在扫描 t1 表中的数据的时候会根据 t2 表过滤后的数据信息,对表 t1 中的数据进行过滤(对应的底层实现就是 t1.key IN (SELECT t2.key FROM t2 WHERE t2.id < 2 )),过滤的时候会将 t1 表中不满足条件的数据过滤掉,只获取需要的那部分数据。这个过程就是动态分区裁剪。

2,触发动态分区裁剪的条件

注意:这个动态分区裁剪操作默认是开启的,但是触发动态分区裁剪是需要一些条件的。
(1)需要裁剪的表必须是分区表,并且分区字段必须在 Join 中的 on 条件里面。

(2)Join 类型必须是 Inner JoinLeft Semi JoinLeft Outer Join 或者 Right Outer Join
  • 针对 Inner JoinJoin 操作左边的表、右边的表可以都是分区表,或者只有某一个表是分区表,至少要有一个表是分区表,这样才能支持裁剪。
  • 针对 Left Semi Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪。
  • 针对 Left Outer Join:需要保证 Join 操作右边的表是分区表,这样才能支持裁剪。
  • 针对 Right Outer Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪。

(3)另一张表里面需要至少存在一个过滤条件。
  • 例如前面案例中的 t2.id<2 这个过滤条件,如果两个表在 join 的时候没有指定过滤条件,那就肯定不会触发动态分区裁剪了。

(4)满足了前面 3 点的要求,也不一定会触发动态分区裁剪,此时还需要在运行时统计这个动态分区裁剪操作的代价,如果做了动态分区裁剪,最终对性能也没有多少提高,那还不如不做。
  • 针对我们这里所说的这个案例,如果针对 t1 表进行过滤后的数据量还是很大,那其实就没有必要进行动态分区裁剪优化了。

3,核心参数

(1)针对动态分区裁剪主要包括下面这 1 个核心参数:
核心参数 默认值 解释
spark.sql.optimizer.dynamicPartitionPruning.enabled true 是否开启动态分区裁剪功能

(2)spark.sql.optimizer.dynamicPartitionPruning.enabled 这个参数默认是 true,所以在工作中使用的时候,如果我们的 Spark SQL 语句和数据可以满足这些要求,就会自动触发动态分区裁剪。

附:案例演示

1,测试未开启动态分区裁剪功能

(1)为了进行对比试验,我们先开发一个未开启动态分区裁剪功能的程序:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DPPScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("DPPScala")
      .config(conf)
      //关闭动态分区裁剪
      .config("spark.sql.optimizer.dynamicPartitionPruning.enabled","false")
      .getOrCreate()

    import sparkSession.implicits._

    /**
     * 创建一个表: t1
     * 1:表中有1000条数据
     * 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
     * 3:这个表是一个分区表,分区字段为key
     */
    sparkSession.range(1000)
      .select($"id", $"id".as("key"))
      .write
      .partitionBy("key")
      .mode("overwrite")
      .saveAsTable("t1")

    /**
     * 创建一个表: t2
     * 1:表中有10条数据
     * 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
     * 3:这个表是一个普通表
     */
    sparkSession.range(10)
      .select($"id", $"id".as("key"))
      .write
      .mode("overwrite")
      .saveAsTable("t2")

    val sql =
      """
        |select
        | t1.id,
        | t2.key
        | from t1 join t2
        |   on t1.key = t2.key
        |   and t2.id < 2
      """.stripMargin

    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\dpp_"+System.currentTimeMillis())

    while (true) {
      ;
    }
    // sparkSession.stop()
  }
}

(2)运行程序,然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况。

(3)可以看到在 join 之前会先对 t2 中的数据进行过滤,然后和 t1 表中的数据进行 join。这里的执行时间为 8 秒。

2,测试开启动态分区裁剪功能

(1)我们修改代码开启动态分区裁剪功能:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DPPScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("DPPScala")
      .config(conf)
      //关闭动态分区裁剪
      //.config("spark.sql.optimizer.dynamicPartitionPruning.enabled","false")
      .getOrCreate()

    import sparkSession.implicits._

    /**
     * 创建一个表: t1
     * 1:表中有1000条数据
     * 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
     * 3:这个表是一个分区表,分区字段为key
     */
    sparkSession.range(1000)
      .select($"id", $"id".as("key"))
      .write
      .partitionBy("key")
      .mode("overwrite")
      .saveAsTable("t1")

    /**
     * 创建一个表: t2
     * 1:表中有10条数据
     * 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
     * 3:这个表是一个普通表
     */
    sparkSession.range(10)
      .select($"id", $"id".as("key"))
      .write
      .mode("overwrite")
      .saveAsTable("t2")

    val sql =
      """
        |select
        | t1.id,
        | t2.key
        | from t1 join t2
        |   on t1.key = t2.key
        |   and t2.id < 2
      """.stripMargin

    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\dpp_"+System.currentTimeMillis())

    while (true) {
      ;
    }
    // sparkSession.stop()
  }
}

(2)程序运行后查看 SQL 界面的任务执行流程:

(3)可以看到,在 join 之前会对 t2 中的数据进行过滤,然后再根据 t2 过滤出来的数据对 t1 表中的数据进行过滤,最后对两个表中过滤后的数据进行 join

(4)二者进行对比,可以看到开启了动态分区裁剪的执行时间为 3 秒。之前没有开启动态分区裁剪时的时间是 8s。所以说还是有很大性能提升的。

(5)从 Stage 界面中也可以看到性能的提升。
  • 没有开启动态分区裁剪的时候这个 stage 需要启动 32 task
  • 开启了动态分区裁剪的时候这个 stage 只需要启动 2task
评论0