Accumulate
package com.shujia.spark.core import java.lang import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator object Demo21Accumulator { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("spark") val sc = new SparkContext(conf) val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) var j = 0 rdd.foreach(i => { j += 1 }) println(j) /** * * 累加器,只能累加 * 累加器只能在Driver定义 * 累加器只能在Executor累加 * 累加器只能在Driver读取 * */ //在Driver端定义累加器 val accumulator: LongAccumulator = sc.longAccumulator rdd.foreach(i=>{ //在Executor端累加 accumulator.add(i) }) //在Driver读取累加结果 val count: lang.Long = accumulator.value println(count) /** * * 累加器的使用 * * 如果不使用累加器需要单独启动一个job计算总人数 * 使用累加器,累加计算和班级人数的计算在一起计算出来 * * */ val student: RDD[String] = sc.textFile("data/students.txt") //定义累加器 val studentNum: LongAccumulator = sc.longAccumulator val kvRDD: RDD[(String, Int)] = student.map(stu => { //累加 studentNum.add(1) val clazz: String = stu.split(",")(4) (clazz, 1) }) val clazzNumRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _) //学生的总人数 val stuNum: lang.Long = studentNum.value clazzNumRDD.foreach(println) } }
Broadcast
package com.shujia.spark.core import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo22Broadcast { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("spark") val sc = new SparkContext(conf) val students: RDD[String] = sc.textFile("data/students.txt") /* val ids = List("1500100010", "1500100013", "1500100015", "1500100016") val filterRDD: RDD[String] = students.filter(student => { val id: String = student.split(",")(0) ids.contains(id) }) filterRDD.foreach(println)*/ /** * * 广播变量 */ val ids = List("1500100010", "1500100013", "1500100015", "1500100016") //1、在Driver端将一个变量广播出去 val broIds: Broadcast[List[String]] = sc.broadcast(ids) val filterRDD: RDD[String] = students.filter(student => { val id: String = student.split(",")(0) //在Executor使用广播变量 val value: List[String] = broIds.value value.contains(id) }) filterRDD.foreach(println) /** * 广播变量的应用 * * * 实现map join * 将小表加载内存中,在map端进行关联 * */ val students1: RDD[String] = sc.textFile("data/students.txt") val scores: RDD[String] = sc.textFile("data/score.txt") /** * * collect :将rdd的数据拉去到Driver端的内存中 * */ val list: Array[String] = students1.collect() val studentMap: Map[String, String] = list.map(stu => { val id: String = stu.split(",")(0) (id, stu) }).toMap //将小表广播 val broStudentMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap) val stuCoInfo: RDD[String] = scores.map(sco => { val id: String = sco.split(",")(0) //读取广播变量 val value: Map[String, String] = broStudentMap.value //使用id 到学生表的map中获取学生信息 val studentInfo: String = value.getOrElse(id, "默认值") studentInfo + "\t" + sco }) stuCoInfo.foreach(println) } }