spark core 指标练习每个部门订单数最大的三个员工,每个作品对应性别的次数

每个部门订单数最大的三个员工

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @date :2021/7/9 11:15
  * @author :xiaotao
  * @description :topN
  *             部门id, 员工id, 订单数
  *              每个部门订单数最大的三个员工
  *              A,0001,22
  *              A,0002,55
  *              A,0003,11
  *              B,0005,88
  *              B,0006,75
  *              A,0007,152
  *              B,0008,65
  *              B,0009,88
  */
object Test1 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf = new SparkConf().setAppName("").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val value: RDD[String] = sc.textFile("D:data\\topN")

    //解析数据切分出每个字段
    val shuju: RDD[(String, (String, Int))] = value.map(l => {
      val bumenId: String = l.split(",")(0)
      val guid: String = l.split(",")(1)
      val orderNum: Int = l.split(",")(2).toInt
      (bumenId, (guid, orderNum))
    })

    //按照部门进行分组,返回每个部门对应一个迭代器
    val groupByBumen: RDD[(String, Iterable[(String, Int)])] = shuju.groupByKey()
    groupByBumen.foreach(println)
/*(B,CompactBuffer((0005,88), (0006,75), (0008,65), (0009,88)))
(A,CompactBuffer((0001,22), (0002,55), (0003,11), (0007,152)))*/
    println("--------------------------")

    //top3
    //然后取出每个迭代器中的按订单倒序排序的前三条数据作为一个数组和部门放入元组返回
    val bumenAndArray: RDD[(String, Array[(String, Int)])] = groupByBumen.map(l => {
      val bumenId: String = l._1
      //数组按照订单数进行倒序排序,然后取取前三条数据
      val top3: Array[(String, Int)] = l._2.toArray.sortWith(_._2 > _._2).take(3)
      (bumenId, top3)
    })

    //进行遍历取部门和员工数据输出
    bumenAndArray.foreach(m=>{
      val bumen: String = m._1
      for (elem <- m._2) {
        println(bumen,elem._1)
      }
    })

    sc.stop()
/*   (B,0005)
    (A,0007)
    (B,0009)
    (A,0002)
    (B,0006)
    (A,0001)*/
  }
}

每个作品对应性别的次数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @date :2021/6/17 15:49
  * @author :xiaotao
  * @description :
  *             作品id,用户,性别
  *              0001,张三,女
  *              0001,张三,女
  *              0001,青青,女
  *              0001,帆帆,男
  *              0001,赵六,男
  *              0001,赵六,男
  *              0001,赵六,男
  *              0001,李四,男
  *              0002,赵武,男
  *              0002,张三,女
  *              统计: 每个作品对应性别的次数(同一个作品对应的用户需要去重)
  *              目标字段: 作品id,性别,次数
  */
object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val lines: RDD[String] = sc.textFile("D:\\data\\a.txt")

    val maped: RDD[(String, String, String)] = lines.map(l => {
      val 商品id: String = l.split(",")(0)
      val 用户: String = l.split(",")(1)
      val 性别: String = l.split(",")(2)
      (商品id, 用户, 性别)
    })

    maped.distinct().map(l => ((l._1, l._3), 1)).reduceByKey(_ + _).map(l => l._1._1 + "," + l._1._2 + "," + l._2).foreach(println)
    /*
    0002,男,1
    0002,女,1
    0001,女,2
    0001,男,3
    */

    sc.stop()
  }
}

上一篇:实时流计算Spark Streaming原理介绍


下一篇:spark面试题-简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?