spark的转换算子及一个案例

spark的转换算子


转换算子

map:同分区有序运行,不同分区无序运行

(在各个分区内取数据,功能强大,但是效率低)(:/1c37ca978ea74eae9f8c4258b0f9064f)

val result1: RDD[Int] = rdd.map(num => {
      println(num)
      num * 2
    })

mapPartitions:一次性取数一个分区,在分区内计算

效率高,但是只有全部计算完成的时候各分区的数据才会释放(因为对象的引用),内存小数据量大的场合下,可能会内存移除

val result2: RDD[Int] = rdd.mapPartitions(x => {
      x.map(_ * 2)})

golm:将一个分区的数据变成集合

val glomRdd: RDD[Array[Int]] = rdd.glom() 
    val maxRDD: RDD[Int] = glomRdd.map(
      data => data.max
    )
    println(maxRDD.collect().sum)

groupBy:讲数据源中的每一个数据进行key进行分

    val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
    def func1(string: String):Char = {
      string(0)
    }
    rdd.groupBy(func1).collect().foreach(println)

filter:过滤,返回布尔类型

    val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
    rdd.filter(_.startsWith("s")).collect().foreach(println)

sample

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7), 2)
    println(rdd.sample(
      false, // 抽取完是否放回,true为放回
      0.4, // 每一个数据被抽取的概率,就算大于1也不一定会被抽取
      1 //随机数种子,可以不设置
    ).collect().mkString(","))

distinct

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 1, 3, 3, 4, 5, 6, 7), 2)
    // case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    rdd.distinct().collect().foreach(print)

coalesce: 缩减分区

(不选择shuffle可能后续会数据倾斜)

repartition:其实就是(coalesce(shuffle true))

可以增加分区,调用的coalesce,但是默认给shuffle=true

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    rdd .coalesce(2, true).saveAsTextFile("out")

sortBy: 会进行shuffle,默认不改变分区且升序

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "2"), (3, "4"), (2, "6")), 2)
    // 是经过shuffle的
    rdd.sortBy(_._2,false)
      .collect().foreach(println)

双value的操作,(交集,并集,差集,拉链)

intersection、union、subtract、zip
交集,并集,差集要求RDD的数据类型要一致;拉链可以不一致
    // 交集
    val rddInt: RDD[Int] = rdd1.intersection(rdd2)
    // 并集
    val rddUnion: RDD[Int] = rdd1.union(rdd2)
    // 差集
    val rddSub: RDD[Int] = rdd1.subtract(rdd2)
    // 拉链
    val rddZip: RDD[(Int, Int)] = rdd1.zip(rdd2)

partitionBy:

    val rddNew: RDD[(Int, Int)] = rdd.map((_, 1))
    // 自带hash分区,可重写
    val result: RDD[(Int, Int)] = rddNew.partitionBy(new HashPartitioner(2))
    result.saveAsTextFile("out")

groupByKey:不给参数,自动获取第一个为key

与groupBy的区别在于groupBy会保留完整的K-V对,groupByKey会提取出value

    val GroupRDD: RDD[(Int, Iterable[(Int, String)])] = rdd.groupBy(_._1)
    // (1,CompactBuffer((1,spark), (1,hello)))(3,CompactBuffer((3,scala)))(2,CompactBuffer((2,java)))

    val GroupByRDD: RDD[(Int, Iterable[String])] = rdd.groupByKey()
    // (1,CompactBuffer(spark, hello))(3,CompactBuffer(scala))(2,CompactBuffer(java))

补充:groupByKey 和 reduceByKey 的区别

shuffle角度:groupByKey是将数据打乱,会进行shuffle重新分区,可以再map进行统计
shuffle必须要落盘操作,不能在内存中等待
reduceByKey会在分区中预聚合(combine:类似MR的map端预聚合)。shuffle的数据就会变少一点;性能更高一点
功能角度:groupByKey是进行分组,再可以实现别的操作,功能更灵活,groupByKey是对相同key的value聚合,如果只是需要分组的话就不能使用

    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
    rdd.reduceByKey(_ + _).collect().foreach(println)
    rdd.groupByKey().map({ case (word, iter) => (word, iter.sum) }).collect().foreach(println)
  • aggregateByKey :两个参数列表。第一个参数列表为初始值;第二个参数列表两个参数:第一个参数为分区内操作,第二个参数为分区间操作
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 2)
    rdd.aggregateByKey(0)(math.max(_,_),_+_).collect.foreach(println)

foldByKey:分区内和分区间的计算规则一样时使用

相当于第二个参数列表两个参数相同的aggregateByKey

val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
    rdd.foldByKey(0)(_ + _).collect.foreach(println)

使用aggregateByKey求key的平均值

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)
    /**
     * 初始值的(0,0) 第一个0是表示“值”,第二个0表示出现分区键“值”的次数,实际每一次的运算都是将初始值更新。
     */
    val rdd1: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
      // 第一次运算时:x代表的是(0,0),y是("a", 1)中的 1,运算的结果就是第一个0+y得到第一此运算的“和”,第二个0+1,得到运算的次数;结果为(1,1)
      (init_tuple, value) => {
        (init_tuple._1 + value, init_tuple._2 + 1)
      },
      // 各个分区内的数据进行运算,“值”和“值”相加,次数和次数相加
      (part1, part2) => {
        (part1._1 + part2._1, part1._2 + part2._2)
      }
    )
    val rdd2: RDD[(String, Int)] = rdd1.mapValues({ case (x, y) => {
      x / y
    }
    })

可以用combineByKey代替:

	val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)

    /**
     * 需要三个参数
     * 第一个参数:将相同key的第一个数据进行结构转换
     * 第二个参数:分区内的计算规则
     * 第三个参数:分区间的计算规则
     */
    val rdd2: RDD[(String, (Int, Int))] = rdd.combineByKey(x => (x, 1),
      (init_tuple: (Int, Int, ), value) => {
        (init_tuple._1 + value, init_tuple._2 + 1)
      }
      , (part1: (Int, Int), part2: (Int, Int)) => {
        (part1._1 + part2._1, part1._2 + part2._2)
      }
    )
  • join、leftOuterJoin、rightOuterJoin,cogroup(这个不发散,会将两个rdd中的数据变成两个迭代器iter)

案例、不同省份的广告点击量前三的城市

// 时间戳	   省份 城市 用户 广告 
// 1635304263 广东 深圳 yan2 N
    val rdd: RDD[String] = sc.textFile("C:\\Users\\93134\\Desktop\\a.txt")
    //1635304263 广东 深圳 yan2 N
    val mapRdd: RDD[((String, String), Int)] = rdd.map(line => {
      val datas: Array[String] = line.split(" ")
      ((datas(1), datas(2)), 1)
    })

    // ((江西,南昌),115)
    val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)

    // (江西,(南昌,115))
    val mapRdd2: RDD[(String, (String, Int))] = reduceRdd.map({
      // 不建议用_的方式,用模式匹配可以直接使用
      case ((province, city), sum) => {
        (province, (city, sum))
      }
    })
    // (江西,CompactBuffer((江西,(吉安,104)), (江西,(赣州,121)), (江西,(南昌,115))))
    val groupRdd: RDD[(String, Iterable[(String, Int)])] = mapRdd2.groupByKey()

    // (江西,List((赣州,121), (南昌,115)))
    val resultRdd: RDD[(String, List[(String, Int)])] = groupRdd.mapValues(values => {
      values.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
    })
    resultRdd.collect().foreach(println)
    sc.stop()
上一篇:【Spark】【RDD】从HDFS创建RDD


下一篇:大数据学习(29)—— Spark Streaming