编码的优化
select ... from ... where ... group by ... having ... order by ...
1)列裁剪
2)分区裁剪
3)分而治之
比如需要拉取1年的数据,可以把每个月的数据先跑出来,再聚合一年的数据。
4)使用广播
让Spark SQL选择Broadcast Joins
利用配置项强制广播
spark.sql.autoBroadcastJoinThreshold 这个配置项。它的设置值是存储大小,默认是 10MB
对于参与 Join 的两张表来说,任意一张表的尺寸小于 10MB,Spark 就在运行时采用 Broadcast Joins 的实现方式去做数据关联
利用 API 强制广播
用 Join Hints 强制广播Join Hints 中的 Hints 表示“提示”,它指的是在开发过程中使用特殊的语法,明确告知 Spark SQL 在运行时采用哪种 Join 策略
一旦你启用了 Join Hints,不管你的数据表是不是满足广播阈值,Spark SQL 都会尽可能地尊重你的意愿和选择,使用 Broadcast Joins 去完成数据关联
val query: String = “select /*+ broadcast(t2) */ * from t1 inner join t2 on t1.key = t2.key”
val queryResutls: DataFrame = spark.sql(query)
table1.join(table2.hint(“broadcast”), Seq(“key”), “inner”)
5)、使用缓存
with a as ( select id from t1 left join t3 using(id)),
b as (select * from t2 left join t1 using(id))
对于表多次被重复使用的情况,可以考虑cache table ,进行缓存复用
参数的优化
基础参数调优
Spark 性能调优的第一步,就是为任务分配更多的资源
资源的分配在使用脚本提交 Spark 任务时进行指定,标准的 Spark 任务提交脚本
bin/spark-submit \
--class com.xx.xx.xx\
--master yarn
--deploy-mode cluster
--num-executors 100\
--driver-memory 5g \
--executor-memory 5g \
--executor-cores 4 \
xxxxx.jar \
名称 | 说明 |
–num-executors | 配置 Executor 的数量 |
–driver-memory | 配置 Driver 内存( 影响不大) |
–executor-memory | 配置每个 Executor 的内存大小 |
–executor-cores | 配置每个 Executor 的 CPU core 数量 |
调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度
生产配置推荐:
--num-executors:100~200
--driver-memory:1G~5G
--executor-memory:5G~20G
--executor-cores:4-8
动态资源分配
动态资源分配(DRA,dynamic resource allocation)
默认情况下,Spark采用资源预分配的方式
即为每个Spark应用设定一个最大可用资源总量,该应用在整个生命周期内都会持有这些资源
Spark提供了一种机制,使它可以根据工作负载动态调整应用程序占用的资源
不使用的资源,应用程序会将资源返回给集群,并在稍后需要时再次请求资源
如果多个应用程序共享Spark集群中的资源,该特性尤为有用
动态的资源分配是 executor 级默认情况下禁用此功能,并在所有粗粒度集群管理器上可用(CDH发行版中默认为true)
动态申请executor
如果有新任务处于等待状态,并且等待时间超过Spark.dynamicAllocation.schedulerBacklogTimeout(默认 1s),则会依次启动executor,每次启动1、2、4、8…个executor(如果有的话)
启动的间隔由
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 控制 (默认与schedulerBacklogTimeout相同)
动态移除executor:
executor空闲时间超过 spark.dynamicAllocation.executorIdleTimeout 设置的值(默认60s),该executor会被移除,除非有缓存数据
相关参数:
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.executorIdleTimeout(默认60s)。Executor闲置了超过此持续时间,将被删除
spark.dynamicAllocation.cachedExecutorIdleTimeout(默认infifinity)。已缓存数据块的 Executor 闲置了超过此持续时间,则该执行器将被删除
spark.dynamicAllocation.initialExecutors(默认spark.dynamicAllocation.minExecutors)。初始分配Executor 的个数。如果设置了--num-executors(spark.executor.instances)并且大于此值,该参数将作为Executor 初始的个数
spark.dynamicAllocation.maxExecutors(默认infifinity)。Executor 数量的上限
spark.dynamicAllocation.minExecutors(默认0)。Executor 数量的下限
spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)。任务等待时间超过了此期限,则将请求新的 Executor
设置合理的shuffle并行度
shuffle partitions的数量默认为200
这个参数是需要进行调整的而且Spark中并不会根据数据量进行动态的设置,需要我们基于数据规模进行调整。
小数据集,200数值显然有些大,大部分时间都花费在调度,任务执行本身花费时间较小
大数据集,200数值有些小,不能充分利用集群资源
建议设置初始分区的1.5-2倍之间
更改方式
spark.sql(“set spark.sql.shuffle.partitions=100”)