spark的groupbykey算子源码分析

spark的groupbykey算子源码分析

先进到defaultPartitioner里,self接收得是父RDD。

这是defaultPartitioner函数:

spark的groupbykey算子源码分析

这一段代码的主要逻辑是分区器的选取问题,是用上游RDD的分区器还是新建一个分区器。

评判标准就是:

如果现有的最大分区器是合格的,或者其分区数大于或等于默认分区数,请使用现有分区器,关键就是看默认分区数的问题,下图就是代码逻辑。

spark的groupbykey算子源码分析

那么如何找到最大分区器?

它会调用hasMaxPartitioner函数,先用hasPartitioner在上游的RDD集中过滤出分区数大于0的分区,然后在这里面找出最大分区数的分区器作为当前分区器。

那么默认的分区数(也就是并行度)哪来?

调用defaultNumPartitions函数,去判断父RDD是否有默认分区数。如果有就直接拿过来,如果没有就拿之前找到的最大分区器的分区数。

现在分区器怎么选的问题解决了,我们再来看看新建分区,new HashPartitioner 的问题。

分区器决定的是在shuffle的时候上游的数据会被下游的哪个task拉取

spark的groupbykey算子源码分析

getPartition函数决定数据会被拉到哪个分区。如果key是null的,那么就直接甩到0号分区,其他的根据key的hashcode值/下游的分区数,具体决定它会被丢到哪个分区。

下图就是计算分区函数

spark的groupbykey算子源码分析

那么我们能不能自定义一个分区器呢,我们进到本文首部的groupByKey

函数中去看一下

下图是代码逻辑

spark的groupbykey算子源码分析

首先声明一下groupbyKey不是在map端进行全局聚合,map端只是各自分区里面分组来局部聚合。

其中主要就是三个函数

createCombiner:新建一个CompactBuffer集合(类似于ArrayBuffer)它把同一个分组的第一个value塞进去

mergeValue:把同一个分组内剩余的其他value一个个的塞进CompactBuffer

mergeCombiners:进行全局分组,把上游多个分区的相同value的分组的CompactBuffer集合加起来

可以看到combineByKeyWithClassTag函数把createCombiner,mergeValue,mergeCombiners,分区器,mapSideCombine不在map端全局聚合,这五个全丢进去了。

进到函数里面,其实就是新建了一个shuffledRdd 来把那三个函数塞进去。

spark的groupbykey算子源码分析

 

让我们最后看一下groupbykey的逻辑执行流程

spark的groupbykey算子源码分析

groupbukey也是一个算子针对的也是task对应的分区,上游有四个分区,每个分区有各自的数据,比如在Map端的0号分区,(只列出spark)groupbykey会针对它进行局部分组形成 (spark,[1,1])

在一号分区会形成(spark,[1,1]),然后shuffule,下游task来拉取,spark这个key被hashcode计算应该被丢到一号分区,所以shuffle端一号分区的task拉取这俩个分区的数据,进行全局分组在它这边聚合成(spark,[1,1,1,1])(图上写错了,应该是4个)

 

上一篇:mysql中的数据格式


下一篇:SQL SERVER SSIS Data Flow