searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

spark-sql优化简述

2023-08-28 10:25:14
101
0

1、自适应中reduce参数控制

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。
spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。
spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

2、合理设置单partition读取数据量

SET spark.sql.files.maxPartitionBytes=xxxx;

3、合理设置shuffle partition的数量

SET spark.sql.shuffle.partitions=xxxx

4、使用coalesce & repartition调整partition数量

SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE

5、使用broadcast join

6、开启Adaptive Query Execution(Spark 3.0)

6.1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数
spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数

当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小

6.2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join

6.3、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪

6.4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。
spark.sql.adaptive.enabled: 是否开启AQE优化
倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N
倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M
拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes
G [代表优化之后,分区数数据的预期大小]

sparksql判断出现数据倾斜的依据[需要两个条件同时满足]:
当某个分区处理的数据量>= N * 所有task处理数据量的中位数
当某个分区处理的数据量>= M

7、文件与分区

SET spark.sql.files.maxPartitionBytes=xxx  //读取文件的时候一个分区接受多少数据;
spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值

8、CBO优化

spark.sql.cbo.enabled: 是否开启cbo优化
spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序
spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序

9、hints优化

hints预防主要用在分区和join上。

Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE

Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;

## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;

-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

10、缓存表

对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(TableName)或者DataFrame.cache即可,SparkSQL会用内存列存储的格式进行表的缓存,然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和GC的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用SQLContext.setConf()设置,可以通过

spark.sql.inMemoryColumnarStorage.batchSize

这个参数,默认10000,配置列存储单位。

永久视图 view:永久保存一段查询语句的逻辑,而不是查询语句的数据,永久有效,查询这个视图,相当于查询一个SQL语句,如果保存的查询逻辑复杂,这查询视图也耗时长。支持重新覆盖  create or replace view view1 as
临时视图 temporary view:只在当前会话生效,如果会话结束,则临时视图失效,支持重新覆盖 create or replace temporary view temp_view1 as,类似于 SparkSQL 中的 DataFrame.createOrReplaceTempView('视图名'),hive不支持这个语法
缓存表cache table:只在当前会话有效,将一段查询结果集缓存到内存,并赋予一个表名。
table:永久有效,保存数据结构和数据本身到磁盘。
with as:当子查询的嵌套层数太多时,可以用with as 增加可读性。

11、group by优化

为了提高 group by 查询的性能,可以尝试以下几种方法:
    仅选择必要的字段进行 group by 操作,避免选择过多的字段。
    尽可能将 group by 字段类型保持一致,以减少数据转换的开销。
    如果可能,可以将 group by 字段进行哈希分区,以减少数据传输和处理的开销。
    如果使用的是字符串类型,可以考虑使用哈希函数来减少字符串比较的开销。

12、优化倾斜连接
数据偏斜会严重降低联接查询的性能。此功能通过将倾斜的任务拆分(按需复制)为大小大致相等的任务来动态处理排序合并联接中的倾斜。同时启用spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置时,此选项才生效。

 

0条评论
0 / 1000