SparkPi 代码剖析2 | 学习笔记

开发者学堂课程【大数据实时计算框架 Spark 快速入门SparkPi 代码剖析2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1677


SparkPi 代码剖析2


内容介绍:

一、SparkPi 代码剖析2

二、反射的方式来创建一个 TextInputFormat


一、SparkPi 代码剖析2

HadoopRDD 举例子

RDD.scala

236

final def partitions: Array[Partition]

239

partitions- getPartitions

HadoopRDD

149

override def getPartitions: Array[Partition]

185

val newInputFormat = ReflectionUtils.newInstance (inputFormatclass.asInstanceof[classl_]],co.asInstanceof[InputFormat[K, V]]


二、反射的方式来创建一个 TextInputFormat

inputFormatclass 哪里来的???

这个地方可以查看sc.textFile()

SparkContext,scala

def textFile(

832

hadoopFile(path,classof[TextInputFormat],classof[Longwritable],classof[Text],minPartitions). map (pair = > pair._ 2. tostring). setName (path)

1006

def hadoopFile[K, V](

1016

new HadoopRDD(

this,

confBroadcast,

Some (setInputPathsFunc),

inputFormatclass,

keyclass,

valueclass,

minPartitions). setName (path)

inputFormatclass这个参数传个HadoopRDD之后,我们是不是就可以回到刚刚HadoopRDD里面去了

HadoopRDD.scala

199

val inputsplits- inputFormat. getsplits (jobConf, minpartitions)

通过getsplits()这个MapReduce框架里面的方法来获取到path路径所对应的split的Hosts getsplits()方法,我们可以通过TextInputFormat找到它继承的FileInputFormat来找到方法hadoop代码里面的

FileInputFormat.class

210

public Inputsplit[] getsplits (JobConf job, int numSplits) throws IOException (DagScheduler.scala

1553

rdd.dependencies. foreach {

casen: NarrowDependency [_] =>

for (inPart <- n.getparents (partition))

val locs- getPreferredLocsInternal(n.rdd,inPart,

visited)

if (locs!- Nil) {

returnlocs

Case

如果一直找到这个 stage 里面的最开始 RDD ,还是没有最近计算位置

那就返回 Nil

1565

Nil

1021

val tasks: seq[Task[_]] = try {

这里面就是会封装 Tasks

1502

这里面最终会把 Tasks 封装成 Taskset 传递给下游的 TaskScheduler

taskscheduler.submitTasks(newTaskset(

tasks. toArray, stage. id, stage. latest Info. attemptid, jobId, properties))submitTasks

如果要跟进去就要找到具体的 taskschedulerImpl

SparkContext.scala

2592

private def createTaskscheduler(

具体得看你用什么模式来运行的

2602

case "local"-

val scheduler = new TaskschedulerImpl (sc, MAX_ LOCAL_ TASK_ FAILURES, islocal- true)

TaskschedulerImpl.scala

上一篇:Asp.net 2.0 WebPart使用经验点滴


下一篇:阿里云独立IP海外虚拟主机低至0.8元/天!不限流量还送域名?