Spark面试

1. Spark消费 Kafka,分布式的情况下,如何保证消息的顺序?

Kafka 分布式的单位是 Partition。如何保证消息有序,需要分几个情况讨论。

  • 同一个 Partition 用一个 write ahead log 组织,所以可以保证 FIFO 的顺序。

  • 不同 Partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 Partition。比如说 key 是 user id,table row id 等等,所以同一个 user 或者同一个 record 的消息永远只会发送到同一个 Partition上,保证了同一个 user 或 record 的顺序。

  • 当然,如果你有 key skewness 就有些麻烦,需要特殊处理。

 

实际情况中: (1)不关注顺序的业务大量存在;(2)队列无序不代表消息无序。

第(2)条的意思是说: 我们不保证队列的全局有序,但可以保证消息的局部有序。举个例子: 保证来自同1个 order id 的消息,是有序的!

Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton 和 key 是可选的。如果你指定了 partition,那就是所有消息发往同1个 partition,就是有序的。并且在消费端,Kafka 保证,1个 partition 只能被1个 consumer 消费。或者你指定 key(比如 order id),具有同1个 key 的所有消息,会发往同1个 partition。也是有序的。

2. 对于 Spark 中的数据倾斜问题你有什么好的方案?

简单一句: Spark 数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义 Partitioner,使用 Map 侧 Join 代替 Reduce 侧 Join(内存表合并),给倾斜 Key 加上随机前缀等。

什么是数据倾斜 对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈(木桶效应)。

 

 

具体解决方案 :

1. 调整并行度分散同一个 Task 的不同 Key: Spark 在做 Shuffle 时,默认使用 HashPartitioner对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。图中左边绿色框表示 kv 样式的数据,key 可以理解成 name。可以看到 Task0 分配了许多的 key,调整并行度,多了几个 Task,那么每个 Task 处理的数据量就分散了。

2. 自定义Partitioner: 使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task,可以拿上图继续想象一下,通过自定义 Partitioner 可以把原本分到 Task0 的 Key 分到 Task1,那么 Task0 的要处理的数据量就少了。 

3. 将 Reduce side(侧) Join 转变为 Map side(侧) Join: 通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。可以看到 RDD2 被加载到内存中了。

4. 为 skew 的 key 增加随机前/后缀: 为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题。Join 另一则的数据中,与倾斜 Key 对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。

5. 大表随机添加 N 种随机前缀,小表扩大 N 倍: 如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大(很难一个 Key 一个 Key 都加上后缀)。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍),可以看到 RDD2 扩大了 N 倍了,再和加完前缀的大数据做笛卡尔积。

3. 你所理解的 Spark 的 shuffle 过程?

Spark shuffle 处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。

 

7. RDD, DAG, Stage怎么理解?

DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。DAG 在 Spark 中的对应的实现为 DAGScheduler。

RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。Rdd的五个特征:1. dependencies: 建立 RDD 的依赖关系,主要 RDD 之间是宽窄依赖的关系,具有窄依赖关系的 RDD 可以在同一个 stage 中进行计算。2. partition: 一个 RDD 会有若干个分区,分区的大小决定了对这个 RDD 计算的粒度,每个 RDD 的分区的计算都在一个单独的任务中进行。3. preferedlocations: 按照“移动数据不如移动计算”原则,在 Spark 进行任务调度的时候,优先将任务分配到数据块存储的位置。4. compute: Spark 中的计算都是以分区为基本单位的,compute 函数只是对迭代器进行复合,并不保存单次计算的结果。5. partitioner: 只存在于(K,V)类型的 RDD 中,非(K,V)类型的 partitioner 的值就是 None。

RDD 的算子主要分成2类,action 和 transformation。这里的算子概念,可以理解成就是对数据集的变换。action 会触发真正的作业提交,而 transformation 算子是不会立即触发作业提交的。每一个 transformation 方法返回一个新的 RDD。只是某些 transformation 比较复杂,会包含多个子 transformation,因而会生成多个 RDD。这就是实际 RDD 个数比我们想象的多一些 的原因。通常是,当遇到 action 算子时会触发一个job的提交,然后反推回去看前面的 transformation 算子,进而形成一张有向无环图。

Stage 在 DAG 中又进行 stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 stage 又可以划分成若干 task。接下来的事情就是 driver 发送 task 到 executor,executor 自己的线程池去执行这些 task,完成之后将结果返回给 driver。action 算子是划分不同 job 的依据。

 

10. Job 和 Task 怎么理解

Job Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。

Task 一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。看一看图就明白了,可以数一数每个 Stage 有多少个 Task。

 

12. 任务的概念

包含很多 task 的并行计算,可以认为是 Spark RDD 里面的 action,每个 action 的计算会生成一个 job。用户提交的 job 会提交给 DAGScheduler,job 会被分解成 Stage 和 Task。

 

 一句话说说 Spark Streaming 是如何收集和处理数据的

在 Spark Streaming 中,数据采集是逐条进行的,而数据处理是按批 mini batch进行的,因此 Spark Streaming 会先设置好批处理间隔 batch duration,当超过批处理间隔就会把采集到的数据汇总起来成为一批数据交给系统去处理。

 

解释一下窗口间隔window duration和滑动间隔slide duration

Spark面试

  1. 红色的矩形就是一个窗口,窗口 hold 的是一段时间内的数据流。

  2. 这里面每一个 time 都是时间单元,在官方的例子中,每隔 window size 是3 time unit, 而且每隔2个单位时间,窗口会 slide 一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)slide interval - The interval at which the window-based operation is performed (2 in the figure). 
  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的 cron 表达式吧。

窗口间隔一般大于(批处理间隔、滑动间隔)。这都是理解窗口操作的关键。

 

简单描述一下Spark Streaming的容错原理

Spark Streaming 的一个特点就是高容错。

首先 Spark RDD 就有容错机制,每一个 RDD 都是不可变的分布式可重算的数据集,其记录这确定性的操作血统,所以只要输入数据是可容错的,那么任意一个 RDD 的分区出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算出来的。

预写日志通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加操作中出现了异常,可以通过读取日志文件并重新施加该操作。

另外接收数据的正确性只在数据被预写到日志以后接收器才会确认,已经缓存但还没保存的数据可以在 Driver 重新启动之后由数据源再发送一次,这两个机制确保了零数据丢失,所有数据或者从日志中恢复,或者由数据源重发。

 

StreamingContext启动时序图吗

  1. 初始化 StreamingContext 中的 DStreamGraph 和 JobScheduler,进而启动 JobScheduler 的 ReceiveTracker 和 JobGenerator。

  2. 初始化阶段会进行成员变量的初始化,重要的包括 DStreamGraph(包含 DStream 之间相互依赖的有向无环图),JobScheduler(定时查看 DStreamGraph,然后根据流入的数据生成运行作业),StreamingTab(在 Spark Streaming 运行的时候对流数据处理的监控)。

  3. 然后就是创建 InputDStream,接着就是对 InputDStream 进行 flatMap, map, reduceByKey, print 等操作,类似于 RDD 的转换操作。

  4. 启动 JobScheduler,实例化并启动 ReceiveTracker 和 JobGenerator。

  5. 启动 JobGenerator

  6. 启动 ReceiverTracker

 

Spark内核调度

Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。
​Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。


Job执行是DAG图

Spark面试

运行词频统计WordCount,截取4040监控页面上DAG图

Spark面试

当RDD调用Action函数(Job触发函数)时,产出1个Job,执行Job。

1、将Job中所有RDD按照依赖关系构建图:DAG图(有向无环图)

2、将DAG图划分为Stage阶段,分为2种类型

ResultStage,对结果RDD进行处理Stage阶段
ShuffleMapStage,此Stage阶段中最后1个RDD产生Shuffle
3、每个Stage中至少有1个RDD或多个RDD,每个RDD有多个分区,每个分区数据被1个Task处理

每个Stage中有多个Task处理数据,每个Task处理1个分区数据


内核调度之RDD 依赖

RDD 间存在着血统继承关系,其本质上是 RDD之间的依赖(Dependency)关系
[每个RDD记录,如何从父RDD得到的,调用哪个转换函数]

Spark面试

从DAG图上来看,RDD之间依赖关系存在2种类型:

Spark面试

 

内核调度之DAG和Stage

在Spark应用执行时,每个Job执行时(RDD调用Action函数时),依据最后一个RDD(调用Action函数RDD),依据RDD依赖关系,向前推到,构建Job中所有RDD依赖关系图,称之为DAG图。
Spark面试

当构建完成Job DAG图以后,继续从Job最后一个RDD开始,依据RDD之间依赖关系,将DAG图划分为Stage阶段,当RDD之间依赖为Shuffle依赖时,划分一个Stage。

对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage;
对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分。

Spark面试

把DAG划分成互相依赖的多个Stage,划分依据是RDD之间的宽依赖,Stage是由一组并行的Task组成。

 

内核调度之Spark Shuffle

 Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。

​ Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。

Spark面试

Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

Spark面试

Stage划分为2种类型:

1)ShuffleMapStage,在Spark 1个Job中,除了最后一个Stage之外,其他所有的Stage都是此类型;将Shuffle数据写入到本地磁盘,ShuffleWriter在此Stage中,所有的Task称为:ShuffleMapTask
2)、ResultStage,在Spark的1个Job中,最后一个Stage,对结果RDD进行操作会读取前一个Stage中数据,ShuffleReader在此Stage中,所有的Task任务称为ResultTask。
[ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask ]

 

内核调度之Job 调度流程

当启动Spark Application的时候,运行MAIN函数,首先创建SparkContext对象(构建DAGScheduler和TaskScheduler)。

第一点、DAGScheduler实例对象

将每个Job的DAG图划分为Stage,依据RDD之间依赖为宽依赖(产生Shuffle)

第二点、TaskScheduler实例对象

调度每个Stage中所有Task:TaskSet,发送到Executor上执行每个Stage中会有多个Task,所有Task处理数据不一样(每个分区数据被1个Task处理),但是处理逻辑一样的。
将每个Stage中所有Task任务,放在一起称为TaskSet。

Spark面试

当RDD调用Action函数(比如count、saveTextFile或foreachPartition)时,触发一个Job执行,调度中流程如下图所示:

Spark面试

​ Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。
1)DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。

2)TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。

Spark面试

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。
一个Spark应用程序包括Job、Stage及Task:
第一、Job是以Action方法为界,遇到一个Action方法则触发一个Job;
第二、Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
第三、Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
 

内核调度之并行度

在Spark Application运行时,并行度可以从两个方面理解:

  • 1)、资源的并行度:由节点数(executor)和cpu数(core)决定的
  • 2)、数据的并行度:task的数据,partition大小

Task数目要是core总数的2-3倍为佳

 

 

 

 

 

 

 

 

 

 

 

上一篇:vue中this.$router.push()路由传值和获取的两种常见方法


下一篇:Hadoop复习第七章-spark