开发者学堂课程【大数据实时计算框架 Spark 快速入门:SparkPi 代码剖析2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1677
SparkPi 代码剖析2
内容介绍:
一、SparkPi 代码剖析2
二、反射的方式来创建一个 TextInputFormat
一、SparkPi 代码剖析2
HadoopRDD 举例子
RDD.scala
236
final def partitions: Array[Partition]
239
partitions- getPartitions
HadoopRDD
149
override def getPartitions: Array[Partition]
185
val newInputFormat = ReflectionUtils.newInstance (inputFormatclass.asInstanceof[classl_]],co.asInstanceof[InputFormat[K, V]]
二、反射的方式来创建一个 TextInputFormat
inputFormatclass 哪里来的???
这个地方可以查看sc.textFile()
SparkContext,scala
def textFile(
832
hadoopFile(path,classof[TextInputFormat],classof[Longwritable],classof[Text],minPartitions). map (pair = > pair._ 2. tostring). setName (path)
1006
def hadoopFile[K, V](
1016
new HadoopRDD(
this,
confBroadcast,
Some (setInputPathsFunc),
inputFormatclass,
keyclass,
valueclass,
minPartitions). setName (path)
inputFormatclass这个参数传个HadoopRDD之后,我们是不是就可以回到刚刚HadoopRDD里面去了
HadoopRDD.scala
199
val inputsplits- inputFormat. getsplits (jobConf, minpartitions)
通过getsplits()这个MapReduce框架里面的方法来获取到path路径所对应的split的Hosts getsplits()方法,我们可以通过TextInputFormat找到它继承的FileInputFormat来找到方法hadoop代码里面的
FileInputFormat.class
210
public Inputsplit[] getsplits (JobConf job, int numSplits) throws IOException (DagScheduler.scala
1553
rdd.dependencies. foreach {
casen: NarrowDependency [_] =>
for (inPart <- n.getparents (partition))
val locs- getPreferredLocsInternal(n.rdd,inPart,
visited)
if (locs!- Nil) {
returnlocs
Case
如果一直找到这个 stage 里面的最开始 RDD ,还是没有最近计算位置
那就返回 Nil
1565
Nil
1021
val tasks: seq[Task[_]] = try {
这里面就是会封装 Tasks
1502
这里面最终会把 Tasks 封装成 Taskset 传递给下游的 TaskScheduler
taskscheduler.submitTasks(newTaskset(
tasks. toArray, stage. id, stage. latest Info. attemptid, jobId,
propert
ies))submitTasks
如果要跟进去就要找到具体的 taskschedulerImpl
SparkContext.scala
2592
private def createTaskscheduler(
具体得看你用什么模式来运行的
2602
case "local"-
val scheduler = new TaskschedulerImpl (sc, MAX_ LOCAL_ TASK_ FAILURES, islocal- true)
TaskschedulerImpl.scala