Spark 3.x新特性 - 自适应查询执行详解1(自适应调整Shuffle分区数量)
自适应查询执行(Adaptive Query Execution),简称为 AQE。它是对 Spark 执行计划的优化,它可以基于任务运行时统计的数据指标动态修改 Spark 的执行计划。自适应查询执行主要带来了下面这 3 点优化功能:
- 自适应调整 Shuffle 分区数量。
- 动态调整 Join 策略。
- 动态优化倾斜的 Join。
本文首先介绍其中的第一个自适应调整 Shuffle 分区数量。
一、自适应调整 Shuffle 分区数量
1,为什么要自适应调整 Shuffle 分区数量?
(1)Spark 在处理海量数据的时候,其中的 Shuffle 过程是比较消耗资源的,也比较影响性能,因为它需要在网络中传输数据。
(2)Shuffle 中的一个关键属性是:分区的数量。分区的最佳数量取决于数据自身大小,但是数据大小可能在不同的阶段、不同的查询之间有很大的差异,这使得这个数字很难精准调优。
- 如果分区数量太多,每个分区的数据就很小,读取小的数据块会导致 IO 效率降低,并且也会产生过多的 task,这样会给 Spark 任务带来更多负担。
- 如果分区数量太少,那么每个分区处理的数据可能非常大,处理这些大分区的数据可能需要将数据溢写到磁盘(例如:排序或聚合操作),这样也会降低计算效率。
(3)想要解决这个问题,就需要给 Shuffle 设置合适的分区数量,如果手工设置,基本上是无法达到最优效率的。想要达到最优效率,就需要依赖于我们这里所说的自适应调整 Shuffle 分区数量这个策略了。
2,实现的底层策略
(1)Spark 初始会设置一个较大的 Shuffle 分区个数,这个数值默认是 200,后续在运行时会根据动态统计到的数据信息,将小的分区合并,也就是慢慢减少分区数量。
(2)假设我们运行 select flag,max(num) from t1 group by flag 这个 SQL 语句,表 t1 中的数据比较少。
- 现在我们把初始的 Shuffle 分区数量设置为 5,所以在 Shuffle 过程中数据会产生 5 个分区。如果没有开启自适应调整 Shuffle 分区数量这个策略,Spark 会启动 5 个 Reduce 任务来完成最后的聚合。但是这里面有 3 个非常小的分区,为每个分区分别启动一个单独的任务会浪费资源,并且也无法提高执行效率。因为这 3 个非常小的分区对应的任务很快就执行完了,另外 2 个比较大的分区对应的任务需要执行很长时间,资源没有被充分利用到。
- 开启自适应调整 Shuffle 分区数量之后,Spark 会将这 3 个数据量比较小的分区合并为 1 个分区,让 1 个 reduce 任务处理,这个时候最终的聚合操作只需要启动 3 个 reduce 任务就可以了。
3,核心参数
(1)关于自适应调整 Shuffle 分区数量这个机制的核心参数主要包括下面这几个:
核心参数 | 默认值 | 解释 |
spark.sql.adaptive.enabled | true | 是否开启 AQE 机制 |
spark.sql.adaptive.coalescePartitions.enabled | true | 是否开启 AQE 中的自适应调整 Shuffle 分区数量机制 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 67108864b(64M) | 建议的 Shuffle 分区大小 |
(2)各参数的详细说明如下:
- spark.sql.adaptive.enabled:这个参数是控制整个自适应查询执行机制是否开启的,也就是控制 AQE 机制的。默认值是 true,表示默认是开启的。
- spark.sql.adaptive.coalescePartitions.enabled:这个参数才是真正控制自适应调整 Shuffle 分区数量这个机制是否开启的。默认值是 true,表示默认也是开启的。
注意:想要开启这个功能,第1个参数肯定也要设置为 true。因为自适应调整 Shuffle 分区数量这个机制是 AQE 机制中的一个子功能。
- spark.sql.adaptive.advisoryPartitionSizeInBytes:这个参数是控制 shuffle 中分区大小的,默认是 64M。
注意:理论上来说,这个参数越大,shuffle 中最终产生的分区数量就越少,但是也不能太大,太大的话的产生的分区数量就太少了,会导致产生的任务数量也变少,最终会影响执行效率。
附一:案例演示
1,准备测试数据
(1)首先我们编译要给用于生成测试数据的 Java 类,代码如下:
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.Random; public class GenerateJSONData { public static void main(String[] args) throws Exception{ //自适应调整Shuffle分区数量-生成测试数据 generateCoalescePartitionData(); } private static void generateCoalescePartitionData() throws IOException { String fileName = "D:\\temp\\spark_json_1.dat"; System.out.println("start: 开始生成文件->"+fileName); BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName)); int num = 0; //area = A_US while(num<3000000){ bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000) +"\"," + "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," + "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," + "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," + "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_US\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } //flag = A_ID num = 0; while(num<1000){ bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," + "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," + "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," + "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," + "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_ID\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } //flag = A_IN num = 0; while(num<2000){ bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," + "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," + "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," + "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," + "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_IN\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } //flag = A_KP num = 0; while(num<3000){ bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," + "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," + "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," + "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," + "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_KP\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } //flag = A_JP num = 0; while(num<2800000){ bfw.write("{\"id\":\"14943445328940974601"+new Random().nextInt(1000)+"\"," + "\"uid\":\"840717325115457536\",\"lat\":\"53.530598\",\"lnt\":\"-2.5620373\"," + "\"hots\":0,\"title\":\"0\",\"status\":\"1\",\"topicId\":\"0\",\"end_time\":\"1494344570\"," + "\"watch_num\":"+new Random().nextInt(10000) +",\"share_num\":\"1\",\"replay_url\":null," + "\"replay_num\":0,\"start_time\":\"1494344544\",\"timestamp\":1494344571,\"area\":\"A_JP\"}"); bfw.newLine(); num ++; if(num%10000==0){ bfw.flush(); } } bfw.flush(); bfw.close(); System.out.println("end: 文件已生成"); } }
2,测试关闭自适应调整 Shuffle 分区数量的策略
(1)为了对比试验,首先关闭自适应调整 Shuffle 分区数量的策略。
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf object AQECoalescePartitionsScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQECoalescePartitionsScala") .config(conf) //禁用AQE机制 .config("spark.sql.adaptive.enabled","false") //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了) .config("spark.sql.adaptive.coalescePartitions.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat") //创建临时表 jsonDf.createOrReplaceTempView("t1") //执行SQL语句 //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。 //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段, //否则shuffle阶段传输的数据量比较小,效果不明显。 val sql = """ |select | id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |from t1 |group by id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)然后使用浏览器访问 http://localhost:4040 打开任务界面中查看任务执行情况:
- 到 Stage 中进行查看。前面 2 个 Stage 在运行时都是产生了 13 个 task,是因为读取的数据是 1655M,按照 128M 切分,正好是 13 个 task。
- 最上面的那个 Stage 是根据 Shuffle 分区之后产生的任务数量,默认情况下 SparkSql 中的 shuffle 分区数量是 200,每个分区会产生一个task,所以这里最终产生了 200 个 task。其实这个 Shuffle 过程中产生的数据量并没有多少,一共只有 87.9M。最终这个 Stage 执行了 1.1 分钟。
(3)可以点进去看一下,每个 task 大致处理 450KB 的数据。
(4)这个时候其实拆分出这么多的分区只会降低计算效率,因为数据量太小了。但是在 Spark 3.0 之前,Spark 无法提前预知中间某一个 Shuffle 的数据量,考虑到 Spark 是处理海量数据的,所以给 SparkSQL 的 Shuffle 分区数量设置了一个比较大的默认值 200。
(5)这些数据指标也可以在 SQL 界面中查看。
提示:
- 在 SQL 界面中查看的其实是这个 SQL 任务的执行流程,或者说是执行计划。
- 在 Stage 界面查看的是SQL转换为 RDD 之后执行的具体情况。
3,测试手工指定 shuffle 分区数量
(1)针对这个没有开启自适应调整 Shuffle 分区数量的代码,或者是 Spark3.0 版本之前的代码,如果想要进行优化,我们可以考虑手工指定 shuffle 分区数量。修改代码,将 Shuffle 分区数量设置为 10。
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf object AQECoalescePartitionsScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQECoalescePartitionsScala") .config(conf) //手工指定Shuffle分区数量,默认是200 .config("spark.sql.shuffle.partitions","10") //禁用AQE机制 .config("spark.sql.adaptive.enabled","false") //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了) .config("spark.sql.adaptive.coalescePartitions.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat") //创建临时表 jsonDf.createOrReplaceTempView("t1") //执行SQL语句 //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。 //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段, //否则shuffle阶段传输的数据量比较小,效果不明显。 val sql = """ |select | id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |from t1 |group by id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)执行代码,到任务界面中查看效果。
- 此时发现在 Stage 中产生了 10 个 Task,因为我们在代码中设置了 Shuffle 分区数量是 10,所以产生了 10 个 Task,这个是正确的。
- 分区变小之后,整个 Stage 的执行时间也减少了很多,现在只需要执行 28 秒了,还是有很大性能提升的。
(3)但是通过手工调整有 2 个问题:
- 手工设置的 Shuffle 分区数量不一定是最优的。
- 针对每个 SparkSQL 任务都依赖手工调整会非常麻烦。
4,测试自适应调整 Shuffle 分区数量
(1)从 Spark 3.0 开始,引入了 AQE 机制,可以实现自适应调整 Shuffle 分区数量。
注意:默认情况下 AQE 机制是开启的,自适应调整 Shuffle 分区数量功能也是开启的。将之前添加的配置注释掉即可。
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf object AQECoalescePartitionsScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQECoalescePartitionsScala") .config(conf) //手工指定Shuffle分区数量,默认是200 //.config("spark.sql.shuffle.partitions","10") //禁用AQE机制 //.config("spark.sql.adaptive.enabled","false") //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了) //.config("spark.sql.adaptive.coalescePartitions.enabled","false") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat") //创建临时表 jsonDf.createOrReplaceTempView("t1") //执行SQL语句 //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。 //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段, //否则shuffle阶段传输的数据量比较小,效果不明显。 val sql = """ |select | id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |from t1 |group by id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)执行代码,到任务界面中查看效果。
- 此时发现 Stage 中的 task 数量是 2,这个就表示经过自适应调整 Shuffle 分区数量之后,将分区数量调整成了 2 个。最终产生 2 个 Task。整个 Stage 的执行耗时为 37 秒。
- 其中一个 task 处理 61M 的数据,另外一个 task 处理 26M 的数据。
(3)也可以到 SQL 模块中查看相关指标:
附二:自动调整分区数量依据
1,依据说明
(1)上面测试自适应调整 Shuffle 分区数量机制时,针对我们的测试数据,最终会自动调整成 2 个分区。那为什么不调整成 3 个分区呢?
(2)事实上,在自动调整分区数量的时候,会根据总的 Shuffle 数据量,参考一个分区大小的阈值来切分分区。其实就是 spark.sql.adaptive.advisoryPartitionSizeInBytes 参数。
- 该参数默认是 64M,所以会大致参考 64M 进行切分,最终切分出来一个是 61M,一个是 26M。
提示:其实这个默认参数也没必要修改,针对海量数据的场景,按照 64M 切分也是合理的。
2,测试样例
(1)我们调整一下默认的 Shuffle 分区大小,改为 30M,看一下此时任务会产生多少个 Shuffle 分区。修改后的代码如下:
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf object AQECoalescePartitionsScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("AQECoalescePartitionsScala") .config(conf) //手工指定Shuffle分区数量,默认是200 //.config("spark.sql.shuffle.partitions","10") //禁用AQE机制 //.config("spark.sql.adaptive.enabled","false") //禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了) //.config("spark.sql.adaptive.coalescePartitions.enabled","false") //设置建议的分区大小 .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "31457280b") .getOrCreate() //读取Json格式数据,获取DataFrame val jsonDf = sparkSession.read.json("D:\\temp\\spark_json_1.dat") //创建临时表 jsonDf.createOrReplaceTempView("t1") //执行SQL语句 //注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。 //在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段, //否则shuffle阶段传输的数据量比较小,效果不明显。 val sql = """ |select | id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |from t1 |group by id,uid,lat,lnt,hots,title,status, | topicId,end_time,watch_num,share_num, | replay_url,replay_num,start_time,timestamp,area |""".stripMargin sparkSession.sql(sql).write.format("json") .save("D:\\temp\\coalesce_partition_"+System.currentTimeMillis()) //让程序一直运行,这样本地的Spark任务界面就可以一直查看 while (true){ ; } } }
(2)执行程序,到任务界面查看效果,此时发现在 Stage 中产生了 4 个 Task,耗时为 29 秒。
(3)接下来看一下每个 task 处理的大致数据量。可以看到最终产生了 3 个 28M 的分区,1 个 2M 的分区,说明我们修改的 spark.sql.adaptive.advisoryPartitionSizeInBytes 这个参数是生效了。