Spark中几种ShuffleWriter的区别你都知道吗?

一.前言

在Spark中有三种shuffle写,分别是BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。分别对应三种不同的shuffleHandle。

这三者和ShuffleHandle的对应关系如下:

UnsafeShuffleWriter:SerializedShuffleHandle

BypassMergeSortShuffleWriter:BypassMergeSortShuffleHandle,

SortShuffleWriter:BaseShuffleHandle

那么这些shuffle写内部的实现细节有何不同,在什么场景下使用什么样的shuffleWriter呢,接下来我们对着三种ShuffleWriter的实现细节做一个比较。

二.不同shuffleHandle的使用时机

不同shuffleWrite的使用其实是根据shuffleHandle来决定的,在构建shuffleDependence时都会构建shuffleHandle,在registerShuffle方法中,有着对shuffleHandle使用的一个条件约束,因此使用条件也有所不同。

1.对于BypassMergeSortShuffleHandle

map端没有聚合操作,且分区必须小于200

在许多使用场景下,有些算子会在map端先进行一次combine,减少数据传输,而BypassMergeSortShuffleHandle不支持这种操作,因为该handle对应的BypassMergeSortShuffleWriter是开辟和后续RDD分区数量一样数量的小文件,读取每条记录算出它的分区号,然后根据分区号判断应该追加到该文件中,此外这个过程也有缓冲区的概念,但一般这个缓冲区都不会特别大,默认为32k。这也是这种shuffle写不支持map端聚合的一个原因,因为聚合必然要在内存中储存一批数据,将相同key的数据做聚合,而这里是直接开辟多个I/O流,根据分区号往文件中追加数据。

而正因为要同时打开多个文件,所以后续RDD的分区数也不能太多,否则同时打开多个文件,产生多个IO,消耗的资源成本很高。

2.对于SerializedShuffleHandle

map端没有聚合操作,需要Serializer支持relocation,分区数目必须小于 16777216

序列化方式需要支持重定位,即使用KryoSerializer等一些序列化方式。这种方式下用到了Tungsten优化,排序的是二进制的数据,不会对数据进行反序列化操作,所以不支持aggregation。至于为什么我们在后续的实现细节做一个解释。

分区数目必须小于 16777216的原因是,partition number是使用24bit 表示的。

3.对于BaseShuffleHandle

以上情况都不满足时,采用这种ShuffleHandle,对应的ShuffleWrite是SortShuffleWriter,这种形式的支持map端聚合操作,而且也支持排序操作。

三.不同shuffleWrite的实现细节

1.BypassMergeSortShuffleWriter 实现细节

BypassMergeSortShuffleWriter会根据RDD的分区数打开此数量的文件,然后通过rdd的迭代器,迭代出每一条数据,对这些record的分区号进行计算,到当前这条数据写入的分区号,然后写入到该分区对应的文件中。

最后数据迭代完毕,会生成许多分区记录文件,之后将所有分区的数据会合并为同一个文件。此外还会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机访问某个partition的所有数据。

但是需要注意的是,这种方式不宜有太多分区,因为过程中会并发打开所有分区对应的临时文件,会对文件系统造成很大的压力。这种模式下为了减少IO次数,会采用buffer,但是buffer的大小默认为32k,当然这个大小是可以通过spark.shuffle.file.buffer参数自定义配置的。

2.UnsafeShuffleWriter 实现细节

UnsafeShuffleWriter 在内部维护了一块内存,这里的内存分为两部分。一部分是以Page(默认4KB)的形式存在的,存储是真正的记录。另一部分是一个存储数据指针的LongArray数组。这些数据都是被序列化存储的,其中指针是采用了8个字节来代表一条数据,8个字节的定义的数据结构具体存储的信息为24 bit partition number[27 bit offset in page] ,其中存储了这条数据的partition和真实记录数据的指针。

数据记录被传入,先进行序列化,写入到内存页Page中,同时对该数据产生一条指针存储到LongArray数组中,做排序操作,排序操作使用的算法是默认是 RadixSort。

在每次排序比较的时候,只需要线性的查找指针区域的数据,不用根据指针去找真实的记录数据做比较,同时序列化器支持对二进制的数据进行排序比较,不会对数据进行反序列化操作,这样避免了反序列化和随机读取带来的开销,因为不会序列化成对象,可以减少内存的消耗和GC的开销。

UnsafeShuffleWriter中内存管理(申请、释放)工作,由ShuffleExternalSorter来完成。ShuffleExternalSorter还有一个作用就是当内存中数据太多的时候,会先spill到磁盘,防止内存溢出。

之后,如果一个Page内存满了,就会申请内存,如果申请不到内存,就 spill到文件中。

在spill时,会根据指针的顺序溢写,这样就保证了每次溢写的文件都是根据Partition来进行排序的。一个文件里不同的partiton的数据用fileSegment来表示,对应的信息存在 SpillInfo 数据结构中。

最后的merge阶段会根据分区号去每个溢写文件中去拉取对应分区的数据,然后写入一个输出文件,最终合并成一个依据分区号全局有序的大文件。此外还会将每个partition的offset写入index文件方便reduce端拉取数据。

merge阶段,相同分区的都合并到一起,最终返回一个完整的文件,根据压缩,加密的需求有不同的合并方式,有以下两种合并方式:

mergeSpillsWithTransferTo方法,基于Java NIO,通过channel直接传输数据,内部通过两层循环遍历每个文件的每个分区,将分区相同的数据合并到一起。

对应mergeSpillsWithFileStream,使用Java标准的流式IO,它主要用于IO压缩的编解码器不支持级联压缩数据,加密被启动或用户已显示禁止使用transferTo的情况

默认配置情况下会使用mergeSpillsWithTransferTo方式,使用lz4进行压缩,不加密,直接通过NIO的transferTo机制合并。NIO的transferTo()方法在内部实现中,由native方法transferTo0()来实现,它依赖底层操作系统的支持。在UNIX和Linux系统中,调用这个方法将会引起sendfile()系统调用。

通过sendfile实现的零拷贝I/O会减少用户态和内核态之间的切换,同时减少不必要的数据拷贝,在这里在发送sendfile系统调用,会触发一次,用户态和内核态的切换,通过DMA将磁盘上文件的内存拷贝到内核空间缓冲区,然后在内核缓冲区触发文件的合并,之后直接将合并后的文件写入到磁盘中,这时又会触发一次数据的拷贝。等sendfile系统调用返回,又会触发一次用户态和内核态之间的切换。

3.SortShuffleWriter 实现细节

SortShuffleWriter 中的处理步骤如下:
1.使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的Key是(partitionId, hash(key)) 这样一个元组。

2.如果超过内存阈值,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序。

3.如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行mergeSort,进行全局排序

SortShuffleWriter 中使用 ExternalSorter 来对内存中的数据进行排序,ExternalSorter内部维护了两个集合PartitionedAppendOnlyMap、PartitionedPairBuffer,两者的区别如下:

Spark中几种ShuffleWriter的区别你都知道吗?

PartitionedAppendOnlyMap的处理逻辑为,根据key值插入数据,如果对应位置有值且等于原先key,直接进行aggregation操作,更新数据。如果对应位置有值且不等于原先key,则利用线性探查法处理Hash冲突,向后挪动一位插入。

内部的排序是使用优化的排序算法TimSort对PartitionedAppendOnlyMap、PartitionedPairBuffer底层的Array进行排序, 排序的逻辑是,先根据PartitionId,再根据Key的hashCode进行排序。

当每个缓冲区达到内存限制时,会将其写出(spill)到一个中间文件中。当用户请求迭代器或文件输出时,溢出文件将和剩余的内存数据合写成一个有序的文件。这里的核心是partitionedIterator,将已排序的文件序列和内存中的数据合并,返回迭代器,迭代器按partition分组,对每个partition,都有一个遍历其内容的迭代器,按顺序访问数据。

最后会删除所有中间文件。

5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》
上一篇:Flink在大规模状态数据集下的checkpoint调优


下一篇:使用cqlsh远程连接cassandra——设置cassandra.yaml里rpc_address和listen_address为ipv4地址即可