注:Spark3.x不支持Hive On Spark

Yarn配置说明

yarn.nodemanager.resource.memory-mb -- 示例64G
一个NodeManager节点分配给Container使用的内存 

yarn.nodemanager.resource.cpu-vcores -- 示例16core
一个NodeManager节点分配给Container使用的CPU核数

yarn.scheduler.maximum-allocation-mb
单个Container能够使用的最大内存

yarn.scheduler.minimum-allocation-mb
单个Container能够使用的最小内存

Spark配置

单个Executor的CPU核数,由spark.executor.cores参数决定,建议配置为4-6,具体配置为多少,视具体情况而定,原则是尽量充分利用资源

此处单个节点共有16个核可供Executor使用,则spark.executor.core配置为4最合适
spark.executor.memory用于指定Executor进程的堆内存大小,这部分内存用于任务的计算和存储

spark.executor.memoryOverhead用于指定Executor进程的堆外内存,这部分内存用于JVM的额外开销,操作系统开销等

默认情况下spark.executor.memoryOverhead的值等于spark.executor.memory*0.1
推荐配置思路是,先按照单个NodeManager的核数和单个Executor的核数,计算出每个NodeManager最多能运行多少个Executor。在将NodeManager的总内存平均分配给每个Executor,最后再将单个Executor的内存按照大约10:1的比例分配到spark.executor.memory和spark.executor.memoryOverhead
(spark.executor.memory+spark.executor.memoryOverhead)= yarn.nodemanager.resource.memory-mb * (spark.executor.cores/yarn.nodemanager.resource.cpu-vcores)

一个Spark应用的Executor个数的指定方式有两种,静态分配动态分配

1)静态分配

可通过spark.executor.instances指定一个Spark应用启动的Executor个数。这种方式需要自行估计每个Spark应用所需的资源,并为每个应用单独配置Executor个数

2)动态分配

动态分配可根据一个Spark应用的工作负载,动态的调整其所占用的资源(Executor个数)

在生产集群中,推荐使用动态分配。动态分配相关参数如下:

#启动动态分配
spark.dynamicAllocation.enabled    true
#启用Spark shuffle服务
spark.shuffle.service.enabled    true
#Executor个数初始值
spark.dynamicAllocation.initialExecutors    1
#Executor个数最小值
spark.dynamicAllocation.minExecutors    1
#Executor个数最大值
spark.dynamicAllocation.maxExecutors    12
#Executor空闲时长,若某Executor空闲时间超过此值,则会被关闭
spark.dynamicAllocation.executorIdleTimeout    60s
#积压任务等待时长,若有Task等待时间超过此值,则申请启动新的Executor
spark.dynamicAllocation.schedulerBacklogTimeout    1s
#spark shuffle老版本协议
spark.shuffle.useOldFetchProtocol true

Driver配置

spark.driver.memory用于指定Driver进程的堆内存大小,spark.driver.memoryOverhead用于指定Driver进程的堆外内存大小

spark.driver.memoryOverhead=spark.driver.memory*0.1。两者的和才算一个Driver进程所需的总内存大小。
一般情况下,按照如下经验进行调整即可:假定yarn.nodemanager.resource.memory-mb设置为X,
若X>50G,则Driver可设置为12G,
若12G<X<50G,则Driver可设置为4G。
若1G<X<12G,则Driver可设置为1G。 
此处yarn.nodemanager.resource.memory-mb为64G,则Driver的总内存可分配12G,所以上述两个参数可配置为。
spark.driver.memory    10G
spark.yarn.driver.memoryOverhead    2G

Hive SQL执行计划

Explain呈现的执行计划,由一系列Stage组成,这个Stage具有依赖关系,每个Stage对应一个MapReduce Job或者Spark Job,或者一个文件系统操作等

每个Stage由一系列的Operator组成,一个Operator代表一个逻辑操作,例如TableScan Operator,Select Operator,Join Operator等

分组聚合优化

优化思路为map-side聚合

所谓map-side聚合,就是在map端维护一个hash table,利用其完成分区内的、部分的聚合,然后将部分聚合的结果,发送至reduce端,完成最终的聚合
map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率

map-side 聚合相关的参数如下:

--启用map-side聚合
set hive.map.aggr=true;

--hash map占用map端内存的最大比例
set hive.map.aggr.hash.percentmemory=0.5;

--用于检测源表是否适合map-side聚合的条数。
set hive.groupby.mapaggr.checkinterval=100000;

--map-side聚合所用的HashTable,占用map任务堆内存的最大比例,若超出该值,则会对HashTable进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;

Join优化

Hive拥有多种join算法,包括common join,map join,sort Merge Bucket Map Join等

1)common join

Map端负责读取参与join的表的数据,并按照关联字段进行分区,将其发送到Reduce端,Reduce端完成最终的关联操作

2)map join

若参与join的表中,有n-1张表足够小,Map端就会缓存小表全部数据,然后扫描另外一张大表,在Map端完成关联操作

3)Sort Merge Bucket Map Join

若参与join的表均为分桶表,且关联字段为分桶字段,且分桶字段是有序的,且大表的分桶数量是小表分桶数量的整数倍。此时,就可以以分桶为单位,为每个Map分配任务了,Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶

Map Join优化

参与join的两表一大一小,可考虑map join优化。

Map Join相关参数如下:
--启用map join自动转换
set hive.auto.convert.join=true;
--common join转map join小表阈值
set hive.auto.convert.join.noconditionaltask.size

 Sort Merge Bucket Map Join 优化

Sort Merge Bucket Map Join相关参数:
--启动Sort Merge Bucket Map Join优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
--使用自动转换SMB Join
set hive.auto.convert.sortmerge.join=true;

示例

表名大小
dwd_trade_order_detail_inc162900000000(约160g)
dim_user_zip12320000000 (约12g)
两张表都相对较大,可以考虑采用SMBSMB Map Join对分桶大小是没有要求的。下面演示如何使用SMB Map Join。

首先需要依据源表创建两个的有序的分桶表,dwd_trade_order_detail_inc建议分36个bucket,dim_user_zip建议分6个bucket,注意分桶个数的倍数关系以及分桶字段和排序字段

然后开启上述两个优化参数即可
--启动Sort Merge Bucket Map Join优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
--使用自动转换SMB Join
set hive.auto.convert.sortmerge.join=true;

数据倾斜优化

数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。
Hive中的数据倾斜常出现在
1.分组聚合
2.join操作     的场景中

由分组聚合导致的数据倾斜

主要有以下两种优化思路:

1)启用map-side聚合
在map端实现聚合
set hive.map.aggr=true;
--hash map占用map端内存的最大比例
set hive.map.aggr.hash.percentmemory=0.5;

2)启用skew groupby优化
其原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合,第二个MR按照分组字段分区,完成最终聚合
--启用分组聚合数据倾斜优化
set hive.groupby.skewindata=true;

由join导致的数据倾斜

由join导致的数据倾斜问题主要有以下两种优化思路

1)使用map join
在map端实现join操作
--启用map join自动转换
set hive.auto.convert.join=true;
--common join转map join小表阈值
set hive.auto.convert.join.noconditionaltask.size

2)启用skew join优化
set hive.optimize.skewjoin=true;
--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;

需要注意的是,skew join只支持Inner Join。

skew join的原理

 任务并行度优化

对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。在Hive中,无论其计算引擎是什么,所有的计算任务都可分为Map阶段和Reduce阶段。所以并行度的调整,也可从上述两个方面进行调整。

Map阶段并行度

Map端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。Map端的并行度相关参数如下

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 
--一个切片的最大值
set mapreduce.input.fileinputformat.split.maxsize=256000000;

Reduce阶段并行度

默认情况下,Hive会根据Reduce端输入数据的大小,估算一个Reduce并行度

Reduce并行度相关参数如下:

--指定Reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
--Reduce端并行度最大值
set hive.exec.reducers.max;
--单个Reduce Task计算的数据量,用于估算Reduce并行度
set hive.exec.reducers.bytes.per.reducer;

Hive自行估算Reduce并行度,估算逻辑如下:

假设Reduce端输入的数据量大小为totalInputBytes
参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer
参数hive.exec.reducers.max的值为maxReducers
则Reduce端的并行度为:

Reduce端输入的数据量大小,是从Reduce上游的Operator的Statistics(统计信息)中获取的。为保证Hive能获得准确的统计信息,需配置如下参数:

--执行DML语句时,收集表级别的统计信息
set hive.stats.autogather=true;
--执行DML语句时,收集字段级别的统计信息
set hive.stats.column.autogather=true;
--计算Reduce并行度时,从上游Operator统计信息获得输入数据量
set hive.spark.use.op.stats=true;
--计算Reduce并行度时,使用列级别的统计信息估算输入数据量
set hive.stats.fetch.column.stats=true;

小文件合并优化

小文件合并优化,分为两个方面,分别是Map端输入的小文件合并,和Reduce端输出的小文件合并

Map端输入文件合并

合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。目的是防止为单个小文件启动一个Map Task,浪费计算资源

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

Reduce输出文件合并

合并Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少HDFS小文件数量。

--开启合并Hive on Spark任务输出的小文件
set hive.merge.sparkfiles=true;

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注