在 Spark 的物理计划阶段,Spark 的 Join Selection 类会根据 Join hints 策略、Join 表的大小、 Join 是等值 Join 还是不等值以及参与 Join 的 key 是否可以排序等条件来选择最终的 Join 策略,最后 Spark 会利用选择好的 Join 策略执行最终的计算

 Spark 一共支持五种Join 策略

Broadcast hash join (BHJ) 
Shuffle hash join(SHJ) 
Shuffle sort merge join (SMJ) 
Shuffle-and-replicate nested loop join,又称笛卡尔积(Cartesian product join) 
Broadcast nested loop join (BNLJ) 	

Join 的 Key 为等值 Join时,会选择以下三种join策略

Broadcast hash join
Shuffle hash join
Shuffle sort merge join

Join的 Key 为不等值 或者 没有指定Join 条件,会选择以下两种join策略

Broadcast nested loop join
Shuffle-and-replicate nested loop join

不同的 Join 策略在执行上效率差别很大

Hash Join原理(大表 join 小表)

示例代码

select * from order,item where item.id = order.i_id
确定Build Table以及Probe Table:Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。

构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。

匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数(在spark中,实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起

Shuffle Hash Join(大表 join 小表)

当表中的数据比较大,又不适合使用广播,这个时候就可以考虑使用 Shuffle Hash Join

Shuffle Hash Join 同样是在大表和小表进行 Join 的时候选择的一种策略

计算思想是:把大表和小表按照相同的分区算法和分区数进行分区(根据参与 Join 的keys 进行分区)

保证 hash 值一样的数据都分发到同一个分区中,然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join

在进行 Join 之前,还会对小表的分区构建 Hash Map

Shuffle hash join 利用了分治思想,把大问题拆解成小问题去解决

Shuffle Hash Join 的使用条件

仅支持等值 Join,不要求参与 Join 的 Keys 可排序 

spark.sql.join.preferSortMergeJoin 参数必须设置为 false,参数是从 Spark 2.0.0 版本引入的,默认值为 true,也就是默认情况下选择 Sort Merge Join 

小表的大小(plan.stats.sizeInBytes)必须小于 
spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认值 200)而且小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小 
(stats.sizeInBytes),也就是 a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes

Broadcast Hash Join(大表 join 小表)

Broadcast Hash Join 的实现是将小表的数据广播到 Spark 所有的 Executor 端,这个广播过程和我们自己去广播数据没什么区别

利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端 

在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端 

在 Executor 端使用广播的数据与大表进行 Join 操作(实际上是执行map操作) 

这种 Join 策略避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join策略执行的要快。

Broadcast Hash Join 的使用条件

小表的数据必须很小,可以通过 spark.sql.autoBroadcastJoinThreshold 参数来配置,默认是 10MB

如果内存比较大,可以将阈值适当加大

将 spark.sql.autoBroadcastJoinThreshold 参数设置为 -1,可以关闭这种连接方式 

只能用于等值 Join,不要求参与 Join 的 keys 可排序

Shuffle Sort Merge Join(大表 join 大表)

Shuffle Sort Merge Join 的实现思想

将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区

对每个分区内的数据进行排序
 
排序后再对相应的分区内的记录进行连接 
无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢;因为两个序列都有序。从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。从而大大提高了大数据量下sql join的稳定性。

Shuffle Sort Merge Join的使用条件

仅支持等值 Join,并且要求参与 Join 的 Keys 可排序 

Cartesian product join(笛卡尔积)

如果 Spark 中两张参与 Join 的表没指定连接条件,那么会产生 Cartesian product join,这个 Join 得到的结果其实就是两张表行数的乘积

Broadcast nested loop join

可以把 Broadcast nested loop join 的执行看做下面的计算:

for record_1 in relation_1: 
    for record_2 in relation_2: 

# join condition is executed 
可以看出 Broadcast nested loop join 在某些情况会对某张表重复扫描多次,效率非常低下。这种 join 会根据相关条件对小表进行广播,以减少表的扫描次数

触发广播的需要满足以下三个条件之一

right outer join 是会广播左表
left outer, left semi, left anti 或者 existence join 时会广播右表
inner join 的时候两张表都会广播

Broadcast nested loop join的使用条件

支持等值和不等值 Join,支持所有的 Join 类型

 Spark 如何选择Join策略

由于 Spark 的计算引擎优化器不是万能的,有些场景下会选择错误的 Join 策略,所以 Spark 2.4 & Spark 3.0 引入了 Join hint,也就是用户可以自己选择 Join 策略

用户指定的 Join hint 优先级最高

源码中Join 策略选择顺序

如果是等值 Join,会按照下面顺序选择 Join 策略

用户是不是指定了 BROADCAST hint (BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一个),如果指定了,那就用 Broadcast Hash Join

用户是不是指定了 SHUFFLE MERGE hint (SHUFFLE_MERGE、MERGE 以及 MERGEJOIN 中的一个),如果指定了,那就用 Shuffle sort merge join

用户是不是指定了 Shuffle Hash Join hint (SHUFFLE_HASH),如果指定了,那就用 Shuffle Hash Join

用户是不是指定了 shuffle-and-replicate nested loop join hint (SHUFFLE_REPLICATE_NL),如果指定了,那就用 Cartesian product join

如果用户没有指定任何 Join hint,那根据 Join 的适用条件按照 Broadcast Hash Join -> Shuffle Hash Join -> Sort Merge Join ->Cartesian Product Join -> Broadcast Nested Loop Join 顺序选择 Join 策略

如果是不等值 Join,那么是按照下面顺序选择 Join 策略

用户是不是指定了 BROADCAST hint (BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一个),如果指定了,那就广播对应的表,并选择 Broadcast Nested Loop Join

用户是不是指定了 shuffle-and-replicate nested loop join hint (SHUFFLE_REPLICATE_NL),如果指定了,那就用 Cartesian product join

如果用户没有指定任何 Join hint,那根据 Join 的适用条件按照 Broadcast Nested Loop Join ->Cartesian Product Join -> Broadcast Nested Loop Join 顺序选择 Join 策略

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注