Accumulator和Broadcast

Accumulate

package com.shujia.spark.core

import java.lang

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator

object Demo21Accumulator {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("spark")


    val sc = new SparkContext(conf)


    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))


    var j = 0

    rdd.foreach(i => {

      j += 1
    })

    println(j)


    /**
      *
      * 累加器,只能累加
      * 累加器只能在Driver定义
      * 累加器只能在Executor累加
      * 累加器只能在Driver读取
      *
      */

    //在Driver端定义累加器
    val accumulator: LongAccumulator = sc.longAccumulator

    rdd.foreach(i=>{

      //在Executor端累加
      accumulator.add(i)
    })

    //在Driver读取累加结果
    val count: lang.Long = accumulator.value

    println(count)

    /**
      *
      * 累加器的使用
      *
      * 如果不使用累加器需要单独启动一个job计算总人数
      * 使用累加器,累加计算和班级人数的计算在一起计算出来
      *
      *
      */

    val student: RDD[String] = sc.textFile("data/students.txt")

    //定义累加器
    val studentNum: LongAccumulator = sc.longAccumulator

    val kvRDD: RDD[(String, Int)] = student.map(stu => {

      //累加
      studentNum.add(1)

      val clazz: String = stu.split(",")(4)
      (clazz, 1)
    })

    val clazzNumRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)


    //学生的总人数
    val stuNum: lang.Long = studentNum.value

    clazzNumRDD.foreach(println)


  }
}

Broadcast

package com.shujia.spark.core

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo22Broadcast {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("spark")

    val sc = new SparkContext(conf)
    val students: RDD[String] = sc.textFile("data/students.txt")

    /*
        val ids = List("1500100010", "1500100013", "1500100015", "1500100016")

        val filterRDD: RDD[String] = students.filter(student => {

          val id: String = student.split(",")(0)

          ids.contains(id)
        })

        filterRDD.foreach(println)*/

    /**
      *
      * 广播变量
      */

    val ids = List("1500100010", "1500100013", "1500100015", "1500100016")

    //1、在Driver端将一个变量广播出去
    val broIds: Broadcast[List[String]] = sc.broadcast(ids)

    val filterRDD: RDD[String] = students.filter(student => {

      val id: String = student.split(",")(0)

      //在Executor使用广播变量
      val value: List[String] = broIds.value
      value.contains(id)
    })

    filterRDD.foreach(println)

    /**
      * 广播变量的应用
      *
      *
      * 实现map join
      * 将小表加载内存中,在map端进行关联
      *
      */

    val students1: RDD[String] = sc.textFile("data/students.txt")
    val scores: RDD[String] = sc.textFile("data/score.txt")

    /**
      *
      * collect :将rdd的数据拉去到Driver端的内存中
      *
      */
    val list: Array[String] = students1.collect()

    val studentMap: Map[String, String] = list.map(stu => {
      val id: String = stu.split(",")(0)
      (id, stu)
    }).toMap

    //将小表广播
    val broStudentMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap)


    val stuCoInfo: RDD[String] = scores.map(sco => {
    val id: String = sco.split(",")(0)

    //读取广播变量
    val value: Map[String, String] = broStudentMap.value

    //使用id 到学生表的map中获取学生信息
    val studentInfo: String = value.getOrElse(id, "默认值")

    studentInfo + "\t" + sco
  })

  stuCoInfo.foreach(println)


  }

}

 

Accumulator和Broadcast

上一篇:sed


下一篇:7.Spring Cloud + Spring Boot + Mybatis + Uniapp分布式、微服务、云架构企业快速开发架构之Linux 远程登录