Spark中 join的原理
一、SparkSQL中join的原理
1.1 SparkSQL的5种join策略
概述:spark将参与join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表;通过遍历streamIter表,取出其中的数据,然后到buildIter表中查找满足条件的数据join;
1.1.1 等值连接
Broadcast Hash Join (BHJ)
- 原理:先将小表的数据从executor端 collect拉取到Driver端,然后Driver端调用sparkContext.broadcast将数据广播到计算的executor端;最后在计算的executor端,将广播的表放到hash表中,大表的数据与内存中的hash小表进行join操作;因此避免了shuffle操作,一般而言都要比其他join策略执行的快。
-
适用条件:
- 小表必须很小:通过参数spark.sql.autoBroadcastJoinThreshold控制,默认10M,设为-1就是关闭BHJ;也就是小表要小于10M才可能采用这种方式,但是在获取表的大小时,是可以通过ANALYZE TABLE命令来获取hive表的大小,而对于自己创建的DaraFrame不能通过ANALYZE TABLE获取大小,所以不会广播,因此最好使用bigtab.join(broadcast(smalltab))手动的标记小表;
- 只能用于等值连接:因为广播后要将数据放入hash表中,然后根据连接key的hash值来查找,所以只支持等值连接;
- 基表不能被广播:例如 le.leftJoin(ri),左连接中左表属于基表,广播左表不起作用;
-
不支持full join:因为full join中两个表都需要遍历和查找,所以一个遍历表,一个查找表的模式不太适用;
Shuffle Hash Join (SHJ)
- 原理:对大表和小表使用相同的分区算法和分区数进行分区(根据join key分区),也就是shuffle;这样就保证了相同hash值的数据在同一个分区中,然后对小表的分区构建hash map,最后进行本地的hash join;
-
适用条件:
- 设置参数:要把spark.sql.join.preferSortMergeJoin设为false,默认是true,也就是默认使用SortMergeJoin
- 小表的大小:小表的大小必须小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions,也就是小表的每个分区大小要小于spark.sql.autoBroadcastJoinThreshold;大表必须是小表的三倍以上;
-
只能用于等值连接:与broadcast的原因一样,都是基于hash的;同样也不支持full join;
Shuffle Sort Merge Join (SMJ)
- 原理:对两张表使用相同的分区算法和分区数进行分区(根据join key分区),也就是shuffle;这样就保证了相同hash值的数据在同一个分区中,然后在shuffle read阶段对连接的key排序;最后再对排好序的两个分区数据进行merge join:指向两个表分区首行的指针pa和pb,比较pa和pb中连接key的大小,如果pa < pb,说明pa这条记录join不上,pa往下走一步;如果pa = pb,说明join上了,连接即可,pa和pb都往下走一步;如果pa > pb,说明pb这条记录join不上,pb往下走一步;直到遍历完所有数据;对于左表有多条相同数据的情况,可以将相同数据放到一个(key, list[value])中作为pa;
-
适用条件:
- 要求参与的key可排序:因为SMJ的原理就是先排序,再遍历merge,所以要求连接的key可排序;
- 只支持等值连接:因为是根据连接key排序,然后进行等值比较;所以不支持非等值连接;
- 支持full join:full join的实现只有SMJ;
- 对表的大小没有限制:前两种策略都对小表有大小限制,而这种策略是不限制大小的;
1.1.2 非等值连接
Cartesian Product Jon (CPJ)
- 原理:就是笛卡尔积,如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;
-
适用条件:
- 必须是inner join:因为两个表的每条记录都会连接,所以左外和右外连接不适用;
-
支持等值和非等值连接:就是先笛卡尔积,然后再逐行判断连接条件,所以对于非等值条件可以可以判断
Broadcast nested loop Join (BNLJ)
- 原理:与Broadcast Hash Join类似,先对小表进行广播,但是不对广播后的小表建立hash表,而是for循环遍历广播表;
-
适用条件:
- 支持等值和非等值连接:由于也是遍历两张表,所以支持非等值连接,并且支持所有join类型;
- 广播表的选择:left out join会广播右表,inner join时两张表都会广播;性能非常低;
1.1.3 join策略的优先级
-
源码位置:org.apache.spark.sql.execution.SparkStrategies文件的JoinSelection类的apply方法;
-
等值连接:Broadcast Hash Join -> Shuffle Hash Join -> Sort Merge Join -> Cartesian Product Join -> Broadcast nested loop Join;按照这个顺序依次检查是否符合join条件,spark2.4可以指定 join hint,也就是指定join的类型;
-
非等值连接:Broadcast nested loop Join ->Cartesian Product Join -> Broadcast nested loop Join;如果有一方可以被广播就选择BNLJ;如果都不能,就判断是不是Inner join,如果是就选择CPJ,否则还是选择BNLJ,并且把两表中较小的那个表作为广播表,因此容易OOM;
二、SparkCore中Join的原理
- 前言:SparkCore的join实现没有SparkSQL那么复杂,底层都是调用了cogroup算子;
2.1 确定Join的分区器Partitioner
- rdd1.join(rdd2):不提供并行度和分区器;则调用defaultPartitioner方法来获取分区器,如果两个rdd的分区器不都为空,则获取分区数较大的rdd的分区器作为 join的分区器;如果都为空,则返回HashPartitioner,分区数为spark.default.parallelism,如果没设置该参数,则用两个rdd中分区较大的作为分区数;
- rdd1.join(rdd2, 5):提供并行度;使用HashPartitioner(5)作为 join的分区器;
- rdd1.join(rdd2, partitioner):提供分区器;则使用提供的分区器作为 join的分区器;
2.2 调用 cogroup算子
- HashPartitioner不能处理Array类型的key:如果分区器是HashPartitioner,并且key是Array类型,就会报错;
-
实例化CoGroupedRDD对象:getDependencies方法用来判断两个rdd的分区器与 join的分区器是否相同
- 相同:返回new OneToOneDependency(rdd);表示该rdd不用shuffle;
- 不相同:返回new ShuffleDependency(rdd, join的分区器);表示该rdd需要用 join的分区器再shuffle一次;
2.3 调用 flatMapValues算子
- 前言:由于cogroup返回的是RDD[(key, (iterable[v1], iterable[v2]))],所以只需要调用 flatMapValues来遍历两个value迭代器,对于不同的 join类型,处理的逻辑也不同;
- inner join:两层循环遍历两个iterable的value,然后返回 (v1, v2);flatMapValues为(k, (v1, v2));
- left join:判断右表的 iterable是否为空,如果为空就返回 (v1, None),后面的与 inner join一样;
- full join:左表为空就返回 (None, v2),右表为空就返回 (v1, None),后面的跟 inner join一样;