引言
我们知道在application中每存在一个action操作就会触发一个job,那么spark底层是怎样触发job的呢?接下来我们用一个wordcount程序来剖析一下job的触发机制。
解析wordcount源码
val lines = sc.textFile()
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
-
val counts = pairs.reduceByKey(_ + _)
其实RDD类中是没有reduceByKey方法的,但是对RDD调用该方法时,会触发scala的隐式转换,会在RDD类中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions,接着会调用PairRDDFunctions类中的reduceByKey()方法。
new PairRDDFunctions(rdd)
在上面我们也可以看到reduceByKey操作是要先在本地做combine,然后再进行reducer分发的。
counts.foreach(count => println(count._1 + ": " + count._2))
连续调用好几个runJob方法后
最终调用SparkContext初始化时创建的DAGSchedule的runJob方法
至此我们知道每个action操作都会调用DAGSchedule的runJob方法来创建一个job!
下一篇文章我们将详解DAGSchedule的runJob方法并剖析DAGSchedule的stage划分原理及源码。