当前位置: > > > Spark 3.x新特性 - 自适应查询执行详解1(自适应调整Shuffle分区数量)

Spark 3.x新特性 - 自适应查询执行详解1(自适应调整Shuffle分区数量)

    自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
  • 自适应调整 Shuffle 分区数量。
  • 动态调整 Join 策略。
  • 动态优化倾斜的 Join
    本文首先介绍其中的第一个自适应调整 Shuffle 分区数量。

一、自适应调整 Shuffle 分区数量

1,为什么要自适应调整 Shuffle 分区数量?

(1)Spark 在处理海量数据的时候,其中的 Shuffle 过程是比较消耗资源的,也比较影响性能,因为它需要在网络中传输数据。

(2)Shuffle 中的一个关键属性是:分区的数量。分区的最佳数量取决于数据自身大小,但是数据大小可能在不同的阶段、不同的查询之间有很大的差异,这使得这个数字很难精准调优。
  • 如果分区数量太多,每个分区的数据就很小,读取小的数据块会导致 IO 效率降低,并且也会产生过多的 task,这样会给 Spark 任务带来更多负担。
  • 如果分区数量太少,那么每个分区处理的数据可能非常大,处理这些大分区的数据可能需要将数据溢写到磁盘(例如:排序或聚合操作),这样也会降低计算效率。

(3)想要解决这个问题,就需要给 Shuffle 设置合适的分区数量,如果手工设置,基本上是无法达到最优效率的。想要达到最优效率,就需要依赖于我们这里所说的自适应调整 Shuffle 分区数量这个策略了。

2,实现的底层策略

(1)Spark 初始会设置一个较大的 Shuffle 分区个数,这个数值默认是 200,后续在运行时会根据动态统计到的数据信息,将小的分区合并,也就是慢慢减少分区数量。

(2)假设我们运行 select flag,max(num) from t1 group by flag 这个 SQL 语句,表 t1 中的数据比较少。
  • 现在我们把初始的 Shuffle 分区数量设置为 5,所以在 Shuffle 过程中数据会产生 5 个分区。如果没有开启自适应调整 Shuffle 分区数量这个策略,Spark 会启动 5Reduce 任务来完成最后的聚合。但是这里面有 3 个非常小的分区,为每个分区分别启动一个单独的任务会浪费资源,并且也无法提高执行效率。因为这 3 个非常小的分区对应的任务很快就执行完了,另外 2 个比较大的分区对应的任务需要执行很长时间,资源没有被充分利用到。
  • 开启自适应调整 Shuffle 分区数量之后,Spark 会将这 3 个数据量比较小的分区合并为 1 个分区,让 1reduce 任务处理,这个时候最终的聚合操作只需要启动 3 reduce 任务就可以了。

3,核心参数

(1)关于自适应调整 Shuffle 分区数量这个机制的核心参数主要包括下面这几个:
核心参数 默认值 解释
spark.sql.adaptive.enabled true 是否开启 AQE 机制
spark.sql.adaptive.coalescePartitions.enabled true 是否开启 AQE 中的自适应调整 Shuffle 分区数量机制
spark.sql.adaptive.advisoryPartitionSizeInBytes 67108864b64M 建议的 Shuffle 分区大小

(2)各参数的详细说明如下:
  • spark.sql.adaptive.enabled:这个参数是控制整个自适应查询执行机制是否开启的,也就是控制 AQE 机制的。默认值是 true,表示默认是开启的。
  • spark.sql.adaptive.coalescePartitions.enabled:这个参数才是真正控制自适应调整 Shuffle 分区数量这个机制是否开启的。默认值是 true,表示默认也是开启的。
注意:想要开启这个功能,第1个参数肯定也要设置为 true。因为自适应调整 Shuffle 分区数量这个机制是 AQE 机制中的一个子功能。
  • spark.sql.adaptive.advisoryPartitionSizeInBytes:这个参数是控制 shuffle 中分区大小的,默认是 64M
注意:理论上来说,这个参数越大,shuffle 中最终产生的分区数量就越少,但是也不能太大,太大的话的产生的分区数量就太少了,会导致产生的任务数量也变少,最终会影响执行效率。

附一:案例演示

1,准备测试数据

(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

public class GenerateJSONData {
    public static void main(String[] args) throws Exception{
        //自适应调整Shuffle分区数量-生成测试数据
        generateCoalescePartitionData();
    }
    private static void generateCoalescePartitionData() throws IOException {
        String fileName = "D:\\temp\\spark_json_1.dat";
        System.out.println("start: 开始生成文件->"+fileName);
        BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
        int num = 0;
        //area = A_US
        while(num<3000000){
            bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000) +"\"," +
                    "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," +
                    "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," +
                    "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," +
                    "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_US\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }

        //flag = A_ID
        num = 0;
        while(num<1000){
            bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," +
                    "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," +
                    "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," +
                    "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," +
                    "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_ID\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }

        //flag = A_IN
        num = 0;
        while(num<2000){
            bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," +
                    "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," +
                    "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," +
                    "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," +
                    "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_IN\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }

        //flag = A_KP
        num = 0;
        while(num<3000){
            bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," +
                    "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," +
                    "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," +
                    "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," +
                    "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_KP\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }

        //flag = A_JP
        num = 0;
        while(num<2800000){
            bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," +
                    "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," +
                    "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," +
                    "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," +
                    "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_JP\"}");
            bfw.newLine();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }
        bfw.flush();
        bfw.close();
        System.out.println("end: 文件已生成");
    }
}

(2)然后执行该程序后会在指定目录生成要给 1.6G 的数据文件,里面为 Json 格式的测试数据:

2,测试关闭自适应调整 Shuffle 分区数量的策略

(1)为了对比试验,首先关闭自适应调整 Shuffle 分区数量的策略。
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object AQECoalescePartitionsScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQECoalescePartitionsScala")
      .config(conf)
      //禁用AQE机制
      .config("spark.sql.adaptive.enabled","false")
      //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
      .config("spark.sql.adaptive.coalescePartitions.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat")
    //创建临时表
    jsonDf.createOrReplaceTempView("t1")
    //执行SQL语句
    //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。
    //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段,
    //否则shuffle阶段传输的数据量比较小,效果不明显。
    val sql =
    """
      |select
      |   id,uid,lat,lnt,hots,title,status,
      |   topicId,end_time,watch_num,share_num,
      |   replay_url,replay_num,start_time,timestamp,area
      |from t1
      |group by id,uid,lat,lnt,hots,title,status,
      |         topicId,end_time,watch_num,share_num,
      |         replay_url,replay_num,start_time,timestamp,area
      |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况:
  • Stage 中进行查看。前面 2Stage 在运行时都是产生了 13 task,是因为读取的数据是 1655M,按照 128M 切分,正好是 13 task
  • 最上面的那个 Stage 是根据 Shuffle 分区之后产生的任务数量,默认情况下 SparkSql 中的 shuffle 分区数量是 200,每个分区会产生一个task,所以这里最终产生了 200 task。其实这个 Shuffle 过程中产生的数据量并没有多少,一共只有 87.9M。最终这个 Stage 执行了 1.1 分钟。

(3)可以点进去看一下,每个 task 大致处理 450KB 的数据。

(4)这个时候其实拆分出这么多的分区只会降低计算效率,因为数据量太小了。但是在 Spark 3.0 之前,Spark 无法提前预知中间某一个 Shuffle 的数据量,考虑到 Spark 是处理海量数据的,所以给 SparkSQL Shuffle 分区数量设置了一个比较大的默认值 200

(5)这些数据指标也可以在 SQL 界面中查看。
提示:
  • SQL 界面中查看的其实是这个 SQL 任务的执行流程,或者说是执行计划。
  • Stage 界面查看的是SQL转换为 RDD 之后执行的具体情况。

3,测试手工指定 shuffle 分区数量

(1)针对这个没有开启自适应调整 Shuffle 分区数量的代码,或者是 Spark3.0 版本之前的代码,如果想要进行优化,我们可以考虑手工指定 shuffle 分区数量。修改代码,将 Shuffle 分区数量设置为 10
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object AQECoalescePartitionsScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQECoalescePartitionsScala")
      .config(conf)
      //手工指定Shuffle分区数量,默认是200
      .config("spark.sql.shuffle.partitions","10")
      //禁用AQE机制
      .config("spark.sql.adaptive.enabled","false")
      //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
      .config("spark.sql.adaptive.coalescePartitions.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat")
    //创建临时表
    jsonDf.createOrReplaceTempView("t1")
    //执行SQL语句
    //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。
    //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段,
    //否则shuffle阶段传输的数据量比较小,效果不明显。
    val sql =
    """
      |select
      |   id,uid,lat,lnt,hots,title,status,
      |   topicId,end_time,watch_num,share_num,
      |   replay_url,replay_num,start_time,timestamp,area
      |from t1
      |group by id,uid,lat,lnt,hots,title,status,
      |         topicId,end_time,watch_num,share_num,
      |         replay_url,replay_num,start_time,timestamp,area
      |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)执行代码,到任务界面中查看效果。
  • 此时发现在 Stage 中产生了 10 Task,因为我们在代码中设置了 Shuffle 分区数量是 10,所以产生了 10 Task,这个是正确的。
  • 分区变小之后,整个 Stage 的执行时间也减少了很多,现在只需要执行 28 秒了,还是有很大性能提升的。

(3)但是通过手工调整有 2 个问题:
  • 手工设置的 Shuffle 分区数量不一定是最优的。
  • 针对每个 SparkSQL 任务都依赖手工调整会非常麻烦。

4,测试自适应调整 Shuffle 分区数量

(1)从 Spark 3.0 开始,引入了 AQE 机制,可以实现自适应调整 Shuffle 分区数量。
注意:默认情况下 AQE 机制是开启的,自适应调整 Shuffle 分区数量功能也是开启的。将之前添加的配置注释掉即可。
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object AQECoalescePartitionsScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQECoalescePartitionsScala")
      .config(conf)
      //手工指定Shuffle分区数量,默认是200
      //.config("spark.sql.shuffle.partitions","10")
      //禁用AQE机制
      //.config("spark.sql.adaptive.enabled","false")
      //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
      //.config("spark.sql.adaptive.coalescePartitions.enabled","false")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat")
    //创建临时表
    jsonDf.createOrReplaceTempView("t1")
    //执行SQL语句
    //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。
    //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段,
    //否则shuffle阶段传输的数据量比较小,效果不明显。
    val sql =
    """
      |select
      |   id,uid,lat,lnt,hots,title,status,
      |   topicId,end_time,watch_num,share_num,
      |   replay_url,replay_num,start_time,timestamp,area
      |from t1
      |group by id,uid,lat,lnt,hots,title,status,
      |         topicId,end_time,watch_num,share_num,
      |         replay_url,replay_num,start_time,timestamp,area
      |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)执行代码,到任务界面中查看效果。
  • 此时发现 Stage 中的 task 数量是 2,这个就表示经过自适应调整 Shuffle 分区数量之后,将分区数量调整成了 2 个。最终产生 2Task。整个 Stage 的执行耗时为 37 秒。
  • 其中一个 task 处理 61M 的数据,另外一个 task 处理 26M 的数据。

(3)也可以到 SQL 模块中查看相关指标:


附二:自动调整分区数量依据

1,依据说明

(1)上面测试自适应调整 Shuffle 分区数量机制时,针对我们的测试数据,最终会自动调整成 2 个分区。那为什么不调整成 3 个分区呢?
(2)事实上,在自动调整分区数量的时候,会根据总的 Shuffle 数据量,参考一个分区大小的阈值来切分分区。其实就是 spark.sql.adaptive.advisoryPartitionSizeInBytes 参数。
  • 该参数默认是 64M,所以会大致参考 64M 进行切分,最终切分出来一个是 61M,一个是 26M
提示:其实这个默认参数也没必要修改,针对海量数据的场景,按照 64M 切分也是合理的。

2,测试样例

(1)我们调整一下默认的 Shuffle 分区大小,改为 30M,看一下此时任务会产生多少个 Shuffle 分区。修改后的代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object AQECoalescePartitionsScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //获取SparkSession,为了操作SparkSQL
    val sparkSession = SparkSession
      .builder()
      .appName("AQECoalescePartitionsScala")
      .config(conf)
      //手工指定Shuffle分区数量,默认是200
      //.config("spark.sql.shuffle.partitions","10")
      //禁用AQE机制
      //.config("spark.sql.adaptive.enabled","false")
      //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
      //.config("spark.sql.adaptive.coalescePartitions.enabled","false")
      //设置建议的分区大小
      .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "31457280b")
      .getOrCreate()

    //读取Json格式数据,获取DataFrame
    val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat")
    //创建临时表
    jsonDf.createOrReplaceTempView("t1")
    //执行SQL语句
    //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。
    //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段,
    //否则shuffle阶段传输的数据量比较小,效果不明显。
    val sql =
    """
      |select
      |   id,uid,lat,lnt,hots,title,status,
      |   topicId,end_time,watch_num,share_num,
      |   replay_url,replay_num,start_time,timestamp,area
      |from t1
      |group by id,uid,lat,lnt,hots,title,status,
      |         topicId,end_time,watch_num,share_num,
      |         replay_url,replay_num,start_time,timestamp,area
      |""".stripMargin
    sparkSession.sql(sql).write.format("json")
      .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis())

    //让程序一直运行,这样本地的Spark任务界面就可以一直查看
    while (true){
      ;
    }
  }
}

(2)执行程序,到任务界面查看效果,此时发现在 Stage 中产生了 4Task,耗时为 29 秒。

(3)接下来看一下每个 task 处理的大致数据量。可以看到最终产生了 328M 的分区,12M 的分区,说明我们修改的 spark.sql.adaptive.advisoryPartitionSizeInBytes 这个参数是生效了。
评论0