SparkShuffle机制-概念

.

一 .前言

Shuffle机制分为Shuffle WriteShuffle Read两个阶段,前者主要解决上游stage输出数据的分区问题,后者主要解决下游stage从上游stage获取数据、重新组织、并为后续操作提供数据的问题。

二 .Shuffle的设计思想

2.1. 解决数据分区问题

1)数据分区问题:该问题针对Shuffle Write阶段。如何对map task输出结果进⾏分区,使得reduce task可以通过⽹络获取相应的数据?

数据分区问题解决⽅案:该问题包含两个⼦问题。

  • 第1个问题是如何确定分区个数?

分区个数与下游stage的task个数⼀致 .

分区个数可以由⽤户⾃定义,如groupByKey(numPartitions)中的numPartitions⼀般被定义为集群中可⽤CPU个数的1~2倍,即将每个map task的输出数据划分为numPartitions份,相应地,在reduce stage中启动numPartitions个task来获取并处理这些数据。

如果⽤户没有定义,则默认分区个数是parent RDD的分区个数的最⼤值。

如图下图的左图所⽰,

在没有定义join(numPartitions)中的分区个数numPartitions的情况下,取两个parent RDD的分区的最⼤值为2。

SparkShuffle机制-概念

  • 第2个问题是如何对maptask输出数据进⾏分区?

解决⽅法是对map task输出的每⼀个<K,V> record,

根据Key计算其partitionId,具有不同partitionId的record被输出到不同的分区(⽂件)中

如上图的右图所⽰,下游stage中只有两个task,分区个数为2。map task需要将其输出数据分为两份,⽅法是让map()操作计算每个输出record的partitionId=Hash(Key)%2,根据partitionId将record直接输出到不同分区中。

这种⽅法⾮常简单,容易实现,但不⽀持Shuffle Write端的combine()操作。

2.2. 解决数据聚合问题

该问题针对Shuffle Read阶段,即如何获取上游不同task的输出数据并按照Key进⾏聚合呢?

如groupByKey()中需要将不同task获取到的<K,V>record 聚合为<K,list(V)>(实现时为<K,CompactBuffer(V)>),reduceByKey()将<K,V> record聚合为<K,func(list(V))>。

数据聚合问题解决⽅案:

数据聚合的本质是将相同Key的record放在⼀起,并进⾏必要的计算,这个过程可以利⽤C++/Java语⾔中的HashMap实现。

⽅法是使⽤两步聚合(two-phase aggregation),先将不同tasks获取到的<K,V>record存放到HashMap中,HashMap中的
Key是K,Value是list(V)。然后,对于HashMap中每⼀个<K,list(V)>record,使⽤func计算得到<K,func(list(V))>record。

如图所⽰,join()在Shuffle Read阶段将来⾃不同task的数据以HashMap⽅式聚合在⼀起,由于join()没有聚合函数,将record按Key聚合后直接执⾏下⼀步操作,使⽤cartesian()计算笛卡⼉积。⽽对于reduceByKey(func)来说,需要进⼀步使⽤func()对相同Key的record进⾏聚合。

SparkShuffle机制-概念

如下图所⽰,两步聚合的第1步是将record存放到HashMap中,第2步是使⽤func()(此处是sum())函数对list(V)进⾏计算,得到最终结果

SparkShuffle机制-概念

两步聚合⽅案的优点是可以解决数据聚合问题,逻辑清晰、容易实现缺点是所有Shuffle的record都会先被存放在HashMap中,占⽤内存空间较⼤。另外,对于包含聚合函数的操作,如reduceByKey(func),需要先将数据聚合到HashMap中以后再执⾏func()聚合函数,效率较低.

优化⽅案:

对于reduceByKey(func)等包含聚合函数的操作来说,我们可以采⽤⼀种**在线聚合(Online aggregation)**的⽅法来减少内
存空间占⽤。如图所⽰,该⽅案在每个record加⼊HashMap时,同时进⾏func()聚合操作,并更新相应的聚合结果。具体地,对于每⼀个新来的<K,V>record,⾸先从HashMap中get出已经存在的结果Vʹ=HashMap.get(K),然后执⾏聚合函数得到新的中间结果Vʹʹ=func(V,Vʹ),最后将Vʹʹ写⼊HashMap中,即HashMap.put(K,Vʹʹ)。⼀般来说,聚合函数的执⾏结果会⼩于原始数据规模,即Size(func(list(V)))<Size(list(V)),如sum()、max()等,所以在线聚合可以减少内存消耗。在线聚合将Shuffle Read和聚合函数计算耦合在⼀起,可以加速计算。但是,对于不包含聚合函数的操作,如groupByKey()等,在线聚合和两步聚合没有差别,因为这些操作不包含聚合函数,⽆法减少中间数据规模

SparkShuffle机制-概念

2.3. 解决map()端combine问题

如何⽀持Shuffle Write端的combine功能。

  • 需要进⾏combine操作:

进⾏combine操作的⽬的是减少Shuffle的数据量,只有包含聚合函数的数据操作需要进⾏map
()端的combine,具体包括**reduceByKey()、foldByKey()、aggregateByKey()、combineByKey()、distinct()**等。对于不包含聚合函数的操作,如groupByKey(),我们即使进⾏了combine操作,也不能减少中间数据的规模。

  • combine解决⽅案:

从本质上讲,combine和Shuffle Read端的聚合过程没有区别,都是将<K,V>record聚合成<K,func(list(V))>,不同的是,Shuffle Read端聚合的是来⾃所有map task输出的数据,⽽combine聚合的是来⾃单⼀task输出的数据。因此仍然
可以采⽤Shuffle Read端基于HashMap的解决⽅案。具体地,⾸先利⽤HashMap进⾏combine,然后对HashMap中每⼀个record进⾏分区,输出到对应的分区⽂件中。

2.4. 解决sort问题

⽀持了Shuffle Write端的combine功能后,我们还要考虑如何⽀持数据排序功能。有些操作如sortByKey()、sortBy()需要将数据按照Key进⾏排序,那么如何在Shuffle机制中完成排序呢?

该问题包含以下两个⼦问题。
(1)在哪⾥执⾏sort?
⾸先,在Shuffle Read端必须执⾏sort,因为从每个task获取的数据组合起来以后不是全局按Key进⾏排序的。其次,理论上,在Shuffle Write端不需要排序,但如果进⾏了排序,那么Shuffle Read获取到(来⾃不同task)的数据是已经部分有序的数据,可以减少Shuffle Read端排序的复杂度。

(2)何时进⾏排序,即如何确定排序和聚合的顺序?
第1种⽅案是先排序再聚合:

这种⽅案需要先使⽤线性数据结构如Array,存储Shuffle Read的<K,V>record,然后对Key进⾏排序,排序后的数据可以直接从前到后进⾏扫描聚合,不需要再使⽤HashMap进⾏hash-based聚合。这种⽅案也是Hadoop MapReduce采⽤的⽅案,⽅案优点是既可以满⾜排序要求又可以满⾜聚合要求;缺点是需要较⼤内存空间来存储线性数据结构,同时排序和聚合过程不能同时进⾏,即不能使⽤在线聚合,效率较低。

第2种⽅案是排序和聚合同时进⾏ :

我们可以使⽤带有排序功能的Map,如TreeMap来对中间数据进⾏聚合,每次Shuffle Read获取到⼀个record,就将其放⼊TreeMap中与现有的record进⾏聚合,过程与HashMap类似,只是TreeMap⾃带排序功能。这种⽅案的优点是排序和聚合可以同时进⾏;缺点是相⽐HashMap,TreeMap的排序复杂度较⾼,TreeMap的插⼊时间复杂度是O(nlogn),⽽且需要不断调整树的结构,不适合数据规模⾮常⼤的情况。

第3种⽅案是先聚合再排序 :

即维持现有基于HashMap的聚合⽅案不变,将HashMap中的record或record的引⽤放⼊线性数据结构中进⾏排序。

这种⽅案的优点是聚合和排序过程独⽴,灵活性较⾼,⽽且之前的在线聚合⽅案不需要改动;缺点是需要复制(copy)数据或引
⽤,空间占⽤较⼤。Spark选择的是第3种⽅案,设计了特殊的HashMap来⾼效完成先聚合再排序的任务.

2.5. 解决内存不⾜问题

Shuffle数据量过⼤导致内存放不下怎么办?

由于我们使⽤HashMap对数据进⾏combine和聚合,在数据量⼤的时候,会出现内存溢出。

这个问题既可能出现在Shuffle Write阶段,又可能出现在Shuffle Read阶段。

解决⽅案:

使⽤内存+磁盘混合存储⽅案。

先在内存(如HashMap)中进⾏数据聚合,如果内存空间不⾜,则将内存中的数据spill到磁盘上,此时空闲出来的内存可以继续处理新的数据。此过程可以不断重复,直到数据处理完成。然⽽,问题是spill到磁盘上的数据实际上是部分聚合的结果,并没有和后续的数据进⾏过聚合。因此,为了得到完整的聚合结果,我们需要在进⾏下⼀步数据操作之前对磁盘上和内存中的数据进⾏再次聚合,这个过程我们称为“全局聚合”。为了加速全局聚合,我们需要将数据spill到磁盘上时进⾏排序,这样全局聚合才能够按顺序读取spill到磁盘上的数据,并减少磁盘I/O。

2.6. Spark中Shuffle框架的设计

SparkShuffle机制-概念

在Shuffle Write端,⽬前只⽀持combine功能,并不⽀持按Key排序功能。

三 .Shuffle Write框架设计和实现

在Shuffle Write阶段,数据操作需要分区、聚合和排序3个功能但每个数据操作只需要其中的⼀个或两个功能。

Spark为了⽀持所有的情况,设计了⼀个通⽤的Shuffle Write框架,框架的计算顺序为“map()输出→数据聚合→排序→分区”输出

SparkShuffle机制-概念
如上图所示, map task每计算出⼀个record及其partitionId,就将record放⼊类似HashMap的数据结构中进⾏聚合;聚合完成后,再将HashMap中的数据放⼊类似Array的数据结构中进⾏排序,既可按照partitionId,也可以按照partitionId+Key进⾏排序;最后根据partitionId将数据写⼊不同的数据分区中,存放到本地磁盘上。其中,聚合(aggregate,即combine)和排序(sort)过程是可选的,如果数据操作不需要聚合或者排序,那么可以去掉相应的聚合或排序过程

3.1. 不需要map()端聚合(combine)和排序

这种情况最简单,只需要实现分区功能。map()依次输出<K,V>record,并计算其partitionId(PID),Spark根据partitionId,将record依次输出到不同的buffer中,每当buffer填满就将record溢写到磁盘上的分区⽂件中。分配buffer的原因是map()输出record的速度很快,需要进⾏缓冲来减少磁盘I/O。在实现代码中,Spark将这种Shuffle Write⽅式称为BypassMergeSortShuffleWriter,即不需要
进⾏排序的Shuffle Write⽅式。

SparkShuffle机制-概念

该模式的优缺点:
优点是速度快,直接将record输出到不同的分区⽂件中。缺点是资源消耗过⾼,每个分区都需要⼀个buffer(⼤⼩由spark.Shuffle.file.buffer控制,默认为32KB),且同时需要建⽴多个分区⽂件进⾏溢写。当分区个数太⼤,如10 000时,每个map task需要约320MB的内存,会造成内存消耗过⼤,⽽且每个task需要同时建⽴和打开10000个⽂件,造成资源不⾜。
因此,该Shuffle⽅案适合分区个数较少的情况(<200)。

该模式适⽤的操作类型:
map()端不需要聚合(combine)、Key不需要排序且分区个数较(<=spark.Shuffle.sort.bypassMergeThres
hold,默认值为200)。例如,groupByKey(100),partitionBy(100),sortByKey(100)等。

3.2. 不需要map()端聚合(combine),但需要排序。

在这种情况下需要按照partitionId+Key进⾏排序。Spark采⽤的实现⽅法是建⽴⼀个Array来存放map()输出的record,并对Array中元素的Key进⾏精⼼设计,将每个<K,V>record转化为<(PID,K),V>record存储;然后按照partitionId+Key对record进⾏排序;最后将所有record写⼊⼀个⽂件中,通过建⽴索引来标⽰每个分区

SparkShuffle机制-概念
如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上,等待map()输出完以后,再将Array中
的record与磁盘上已排序的record进⾏全局排序,得到最终有序的record,并写⼊⽂件中。
该Shuffle模式被命名为SortShuffleWriter(KeyOrdering=true),使⽤的Array被命名为PartitionedPairBuffer。

该Shuffle模式的优缺点:
优点是只需要⼀个Array结构就可以⽀持按照partitionId+Key进⾏排序,Array⼤⼩可控,⽽且具有扩容和spill到磁盘上的功能,⽀持从⼩规模到⼤规模数据的排序。同时,输出的数据已经按照partitionId进⾏排序,因此只需要⼀个分区⽂件存储,即可标⽰不同的分区数据,克服了BypassMergeSortShuffleWriter中建⽴⽂件数过多的问题,适⽤于分区个数很⼤的情况。缺点是排序增加计算时延。

该Shuffle模式适⽤的操作:
map()端不需要聚合(combine)、Key需要排序、分区个数⽆限制。⽬前,Spark本⾝没有提供这种排序类型的数据操作,但不排除⽤户会⾃定义,或者系统未来会提供这种类型的操作。

sortByKey()操作虽然需要按Key进⾏排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进⾏排序。

3.3. 需要map()端聚合(combine),需要或者不需要按Key进⾏排序

需要实现按Key进⾏聚合(combine)的功能。

Spark采⽤的实现⽅法是建⽴⼀个类似HashMap的数据结构对map()输出的record进⾏聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是经过相同combine的聚合结果。

SparkShuffle机制-概念

combine()是sum()函数,那么Value中存放的是多个record对应的Value相加的结果。
聚合完成后,Spark对HashMap中的record进⾏排序。

SparkShuffle机制-概念
如果需要按Key进⾏排序,那么按partitionId+Key进⾏排序。最后,将排序后的record写⼊⼀个分区⽂件中。

如果HashMap存放不下,则会先扩容为两倍⼤⼩,如果还存放不下,就将HashMap中的record排序后spill到磁盘上
此时,HashMap被清空,可以继续对map()输出的record进⾏聚合,如果内存再次不够⽤,那么继续spill到磁盘上,此过程可以重复多次。
当map()输出完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进⾏再次聚合(merge),得到最终的record,输出到分区⽂件中 .

该Shuffle模式的优缺点:
优点是只需要⼀个HashMap结构就可以⽀持map()端的combine功能,HashMap具有扩容和spill到磁盘上的功能,⽀持⼩规模到⼤规模数据的聚合,也适⽤于分区个数很⼤的情况。在聚合后使⽤Array排序,可以灵活⽀持不同的排序需求。**缺点是在内存中进⾏聚合,内存消耗较⼤,需要额外的数组进⾏排序,⽽且如果有数据spill到磁盘上,还需要再次进⾏聚合。**在实现中,Spark在Shuffle Write端使⽤⼀个经过特殊设计和优化的HashMap,命名为PartitionedAppendOnlyMap,可以同时⽀持聚合和排序操作,相当于HashMap和Array的合体.

该Shuffle模式适⽤的操作:
适合map()端聚合(combine)、需要或者不需要按Key进⾏排序、分区个数⽆限制的应⽤,如reduceByKey()、aggregateByKey()等。

Shuffle Write框架需要执⾏的3个步骤是**“数据聚合→排序→分区”。**
如果应⽤中的数据操作不需要聚合,也不需要排序,⽽且分区个数很少,那么可以采⽤直接输出模式,即BypassMergeSortShuffleWriter。

为了克服BypassMergeSortShuffleWriter打开⽂件过多、buffer分配过多的缺点,也为了⽀持需要按Key进⾏排序的操作,Spark提供了SortShuffleWriter,使⽤基于Array的⽅法来按partitionId或partitionId+Key进⾏排序,只输出单⼀的分区⽂件即可。

最后,为了⽀持map()端combine操作,Spark提供了基于HashMap的SortShuffleWriter,将Array替换为类似HashMap的操作来⽀持聚合操作,在聚合后根据partitionId或partitionId+Key对record进⾏排序,并输出分区⽂件。

因为SortShuffleWriter按partitionId进⾏了排序,所以被称为sort-based Shuffle Write。

四 .Shuffle Read框架设计和实现

在Shuffle Read阶段,数据操作需要3个功能:跨节点数据获取、聚合和排序

SparkShuffle机制-概念
Spark为了⽀持所有的情况,设计了⼀个通⽤的Shuffle Read框架,框架的计算顺序为“数据获取→聚合→排序”输出

reduce task不断从各个map task的分区⽂件中获取数据(Fetch records),然后使⽤类似HashMap的结构来对数据进⾏聚合(aggregate),该过程是边获取数据边聚合。聚合完成后,将HashMap中的数据放⼊类似Array的数据结构中按照Key进⾏排序(sort by Key),最后将排序结果输出或者传递给下⼀个操作。如果不需要聚合或者排序,则可以去掉相应的聚合或排序过程。

SparkShuffle机制-概念

4.1. 不需要聚合,不需要按Key进⾏排序。

这种情况最简单,只需要实现数据获取功能即可。等待所有的map task结束后,reduce task开始不断从各个map task获取<K,V>record,并将record输出到⼀个buffer中(⼤⼩为spark.reducer.maxSizeInFlight=48MB),下⼀个操作直接从buffer中获取数据即可

SparkShuffle机制-概念
该Shuffle模式的优缺点:
优点是逻辑和实现简单,内存消耗很⼩。缺点是不⽀持聚合、排序等复杂功能。
该Shuffle模式适⽤的操作:适合既不需要聚合也不需要排序的应⽤,如partitionBy()等。

4.2. 不需要聚合,需要按Key进⾏排序

在这种情况下,需要实现数据获取和按Key排序的功能。
获取数据后,将buffer中的record依次输出到⼀个Array结构(PartitionedPairBuffer)中。由于这⾥采⽤了本来⽤于Shuffle Write端的Partit
ionedPairBuffer结构,所以还保留了每个record的partitionId。然后,对Array中的record按照Key进⾏排序,并将排序结果输出或者传递给下⼀步操作。

当内存⽆法存下所有的record时,PartitionedPairBuffer将record排序后spill到磁盘上,最后将内存中和磁盘上的record进⾏全局排序,得到最终排序后的record。

SparkShuffle机制-概念
该Shuffle模式的优缺点:
优点是只需要⼀个Array结构就可以⽀持按照Key进⾏排序,Array⼤⼩可控,⽽且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。

该Shuffle模式适⽤的操作:
适合reduce端不需要聚合,但需要按Key进⾏排序的操作,如sortByKey()、sortBy()等。

4.3. 需要聚合,不需要或需要按Key进⾏排序

在这种情况下,需要实现按照Key进⾏聚合,根据需要按Key进⾏排序的功能。
获取record后,Spark建⽴⼀个类似HashMap的数据结构(ExternalAppendOnlyMap)对buffer中的record进⾏聚合,HashMap中的Key是record中的Key,HashMap中的Value是经过相同聚合函数(func())计算后的结果。

聚合函数是sum()函数,那么Value中存放的是多个record对应Value相加后的结果。之后,如果需要按照Key进⾏排序,建⽴⼀个Array结构,读取HashMap中的record,并对record按Key进⾏排序,排序完成后,将结果输出或者传递给下⼀步操作。

SparkShuffle机制-概念
SparkShuffle机制-概念
如果HashMap存放不下,则会先扩容为两倍⼤⼩,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。

此时,HashMap被清空,可以继续对buffer中的record进⾏聚合。如果内存再次不够⽤,那么继续spill到磁盘上,此过程可以重复多次。当聚合完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进⾏再次聚合,得到最终的record,输出到分区⽂件中。

该Shuffle模式的优缺点:
优点是只需要⼀个HashMap和⼀个Array结构就可以⽀持reduce端的聚合和排序功能,HashMap 具有扩容和spill到磁盘上的功能,⽀持⼩规模到⼤规模数据的聚合。边获取数据边聚合,效率较⾼。缺点是需要在内存中进⾏聚合,内存消耗较⼤,如果有数据spill到磁盘上,还需要进⾏再次聚合。另外,经过HashMap聚合后的数据仍然需要拷贝到Array中进⾏排序,内存消耗较⼤。在实现中,Spark使⽤的HashMap是⼀个经过特殊优化的HashMap,命名为ExternalAppendOnlyMap,可以同时⽀持聚合和排序操作,相当于HashMap和Array的合体 .

该Shuffle模式适⽤的操作:
适合reduce端需要聚合、不需要或需要按Key进⾏排序的操作,如reduceByKey()、aggregateByKey()等。

Shuffle Read框架需要执⾏的3个步骤是**“数据获取→聚合→排序输出”**。
如果应⽤中的数据操作不需要聚合,也不需要排序,那么获取数据后直接输出。
对于需要按Key进⾏排序的操作,Spark 使⽤基于Array的⽅法来对Key进⾏排序。
对于需要聚合的操作,Spark提供了基于HashMap的聚合⽅法,同时可以再次使⽤Array来⽀持按照Key进⾏排序。
总体来讲,Shuffle Read框架使⽤的技术和数据结构与Shuffle Write过程类似,⽽且由于不需要分区,过程⽐Shuffle Write更为简单。当然,还有⼀些可优化的地⽅,如聚合和排序如何进⾏统⼀来减少内存copy和磁盘I/O等 .

来源:
Apache Spark 设计与实现 – 许利杰

上一篇:leetcode刷题(哈希表)


下一篇:实时监控、直播流、流媒体、视频网站开发方案流媒体服务器搭建及配置详解:使用nginx搭建rtmp直播、rtmp点播、,hls直播服务配置详解