RDD-算子

Spark

RDD-转换算子

RDD-转换算子-map

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -map
    val rdd =sc.makeRDD(List(1,2,3,4))
        //1,2,3,4
       //2,4,6,8
    //转换函数
    def mapFunction(num:Int):Int={
         num * 2
       }
//    val mapRDD:RDD[Int]=rdd.map(mapFunction)
//    val mapRDD:RDD[Int]=rdd.map((num:Int)=>{num*2})
//    val mapRDD:RDD[Int]=rdd.map((num:Int)=>num*2)
//    val mapRDD:RDD[Int]=rdd.map((num)=>num*2)
//    val mapRDD:RDD[Int]=rdd.map(num=>num*2)
    val mapRDD:RDD[Int]=rdd.map(_*2)
    mapRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-转换算子-map-并行计算效果演示

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Par {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -map
    //1.rdd的计算一个分区内的数据是一个一个执行逻辑
    //只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
    //分区内数据的执行是有序的
    val rdd =sc.makeRDD(List(1,2,3,4),1)
    val mapRDD=rdd.map(
      num=>{
        println(">>>>>>>>>>"+num)
        num
      }
    )
    val mapRDD1=rdd.map(
      num=>{
        println("##########"+num)
        num
      }
    )
    mapRDD.collect()
    mapRDD1.collect()
    sc.stop()
  }

}

RDD-转换算子-mapPartitions

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -mapPartitions
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    //mapPartitions:可以以分区为单位进行数据转换操作
    //但是会将整个分区的数据加载到内存进行引用
    //如果处理完的数据是不会被释放掉,存在对象的引用
    //在内存较小,数据量较大的场合下,容易出现溢出
    val mpRDD:RDD[Int]=rdd.mapPartitions(
      iter=>{
        println(">>>>>>>>>>>>>>")
        iter.map(_*2)
      }
    )
    mpRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-转换算子-mapPartitions-小练习

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -mapPartitions
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    //[1,2],[3,4]
    //[2],[4]
    val mpRDD=rdd.mapPartitions(
      iter=>{
        List(iter.max).iterator

      }
    )
    mpRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-mapPartitions&map的区别

1.map():每次处理一条数据。
2.mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。
3.开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。

RDD-转换算子-mapPartitionsWithIndex

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -mapPartitionsWithIndex
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    //[1,2],[3,4]
    //保留第二个分区的数据
    val mpiRDD=rdd.mapPartitionsWithIndex(
      (index,iter)=>{
        if(index==1){
          iter
        }else{
          Nil.iterator
        }
      }

    )
    mpiRDD.collect().foreach(println)
    sc.stop()
  }

}

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -mapPartitionsWithIndex
    //列出每个数据的分区
    val rdd=sc.makeRDD(List(1,2,3,4))
    val mpiRDD=rdd.mapPartitionsWithIndex(
      (index,iter)=>{
        //1,2,3,4
        //(0,1)(2,2),(4,3),(6,4)
        iter.map(
          num=>{
            (index,num)
          }
        )
      }

    )
    mpiRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-flatMap

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -flatMap
    val rdd:RDD[List[Int]]=sc.makeRDD(List(
      List(1,2),List(3,4)
    ))
    val flatRDD:RDD[Int]=rdd.flatMap(
      list=>{
        list
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }

}

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -flatMap
    val rdd:RDD[String]=sc.makeRDD(List(
      "Hello Scala","Hello Spark"
    ))
    val flatRDD:RDD[String]=rdd.flatMap(
      s=>{
        s.split(" ")
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-flatMap-小练习

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -flatMap
    val rdd=sc.makeRDD(List(
      List(1,2),3,List(4,5)
    ))
    val flatRDD=rdd.flatMap(
      data=>{
        data match{
          case list:List[_]=>list
          case dat=>List(dat)
        }
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-glom

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -glom
    val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
    //List=>Int
    //Int=>Array
    val glomRDD:RDD[Array[Int]]=rdd.glom()
    glomRDD.collect().foreach(data=>println(data.mkString(",")))
    sc.stop()
  }

}

RDD-算子

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -glom
    val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
    //[1,2],[3,4]
    //[2],[4]取各分区最大值
    //[6]求和
    val glomRDD:RDD[Array[Int]]=rdd.glom()
    val maxRDD:RDD[Int]=glomRDD.map(
      array=>{
        array.max
      }
    )
    println(maxRDD.collect().sum)
    sc.stop()
  }

}

RDD-转换算子-理解分区不变的含义

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Part {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -map
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    //[1,2],[3,4]
    rdd.saveAsTextFile("output")
    val mapRDD=rdd.map(_*2)
    //[2,4],[6,8]
    mapRDD.saveAsTextFile("output1")
    sc.stop()
  }

}

RDD-转换算子-groupBy

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -groupBy
    val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
    //groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    //相同的key值的数据会放置在一个组中
    def groupFunction(num:Int):Unit={
      num % 2
    }
    val groupRDD=rdd.groupBy(groupFunction)
    groupRDD.collect().foreach(println)
    sc.stop()
  }

}

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -groupBy
    val rdd=sc.makeRDD(List("Hello","Spark","Hello","Hadoop"),2)
       //分组和分区没有必然的关系
    val groupRDD=rdd.groupBy(_.charAt(0))
    groupRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-groupBy-小练习

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -groupBy
    val rdd=sc.textFile("datas/apache.log")
    val timeRDD=rdd.map(
      line=>{
        val datas=line.split(" ")
        val time=datas(3)
        //time.substring(0, )
        val sdf=new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val date:Date=sdf.parse(time)
        val sdf1=new SimpleDateFormat("HH")
        val hour:String=sdf1.format(date)
        (hour,1)
      }
    ).groupBy(_._1)
    timeRDD.map {
      case (hour, iter) => {
        (hour, iter.size)
      }
    }.collect.foreach(println)

    sc.stop()
  }

}

RDD-转换算子-filter

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -filter
    val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4))
    val filterRDD:RDD[Int]=rdd.filter(num=>num%2!=0)
    filterRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -filter
    val rdd=sc.textFile("datas/apache.log")
    rdd.filter(
      line=>{
        val datas=line.split(" ")
        val time=datas(3)
        time.startsWith("17/05/2015")

      }
    ).collect().foreach(println)
    sc.stop()
  }

}

RDD-转换算子-sample

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark08_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -sample
    val rdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
    //sample算子需要传递三个参数
    //1.第一个参数表示,抽取数据后是否将数据放回 true (放回),false(丢弃)
    //2.第二个参数表示,
    //                如果抽取不放回的场合:数据源中每条数据被抽取的概率
    //                 如果抽取放回的场合:表示数据源中每条被抽取的可能次数
    //基准值概念
    //3.第三个参数表示,抽取数据时随机算法的种子
    //如果不传递第三个参数,那么使用的是当前系统时间
//        rdd.sample(
//      false,
//      0.4
      1
//    ).collect().mkString(",")
    rdd.sample(
      true,
      2
      //      1
    ).collect().mkString(",")

    sc.stop()
  }

}

RDD-转换算子-distinct

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark09_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -distinct
    val rdd=sc.makeRDD(List(1,2,3,4,1,2,3,4))
    val rdd1:RDD[Int]=rdd.distinct()
    rdd1.collect().foreach(println)

    sc.stop()
  }

}

RDD-算子

RDD-转换算子-coalesce

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark10_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -coalesce
    val rdd=sc.makeRDD(List(1,2,3,4,5,6),3)
    //coalesce方法默认情况下不会将分区的数据打乱重新组合
    //这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
    //如果想要让数据均衡,可以进行shuffle处理
//    val newRDD=rdd.coalesce(2)
    val newRDD=rdd.coalesce(2,true)
    newRDD.saveAsTextFile("output")

    sc.stop()
  }

}

RDD-转换算子-repartition

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark11_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -repartition
    val rdd=sc.makeRDD(List(1,2,3,4,5,6),2)
    //coalesce算子可以扩大分区,但是如果不进行shuffle操作,是没有意义,不起作用
    //所以如果想要实现扩大分区的效果,需要使用shuffle操作
    //spark提供了一个简化的操作
    //缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
    //扩大分区:repartition 底层代码调用的就是coalesce,而且肯定采用shuffle
//    val newRDD=rdd.coalesce(3,true)
    val newRDD=rdd.repartition(3)
    newRDD.saveAsTextFile("output")

    sc.stop()
  }

}

RDD-转换算子-sortBy

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -sortBy
    val rdd=sc.makeRDD(List(6,2,4,5,3,1),2)
    val newRDD:RDD[Int]=rdd.sortBy(num=>num)
    newRDD.saveAsTextFile("output")

    sc.stop()
  }

}

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -sortBy
    val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)),2)
    //sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
    //sortBy默认情况下不会改变分区,但是中间存在shuffle操作
    val newRDD=rdd.sortBy(t=>t._1.toInt,false)
    newRDD.collect().foreach(println)

    sc.stop()
  }

}

RDD-算子

RDD-转换算子-交集&并集&差集&拉链

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -sortBy双value类型
    val rdd1=sc.makeRDD(List(1,2,3,4))
    val rdd2=sc.makeRDD(List(3,4,5,6))
    //交集 [3,4]
    val rdd3:RDD[Int]=rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))
    //并集 [1,2,3,4,5,6]
    val rdd4:RDD[Int]=rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))
    //差集 [1,2]
    val rdd5:RDD[Int]=rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))
    //拉链 [1-3],[2-4],[3-5],[4-6]
    val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-交集&并集&差集&拉链-注意事项

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -sortBy双value类型
    //交集,并集和差集要求两个数据源类型保持一致
    //拉链操作两个数据源的类型可以不一致
    val rdd1=sc.makeRDD(List(1,2,3,4))
    val rdd2=sc.makeRDD(List(3,4,5,6))
    val rdd7=sc.makeRDD(List("3","4","5","6"))
    //交集 [3,4]
    val rdd3:RDD[Int]=rdd1.intersection(rdd2)
//    val rdd8:RDD[Int]=rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))
    //并集 [1,2,3,4,5,6]
    val rdd4:RDD[Int]=rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))
    //差集 [1,2]
    val rdd5:RDD[Int]=rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))
    //拉链 [1-3],[2-4],[3-5],[4-6]
    val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
//    val rdd8:RDD[(Int,Int)]=rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))
    sc.stop()
  }

}

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -转换算子-双value类型
    //Can't zip RDDs with unequal numbers of partitions: List(2, 4)
    //两个数据源要求分区数量保持一致

//    val rdd1=sc.makeRDD(List(1,2,3,4),2)
//    val rdd2=sc.makeRDD(List(3,4,5,6),4)
    //Can only zip RDDs with same number of elements in each partition
    //两个数据源要求分区中数据数据数量保持一致
    val rdd1=sc.makeRDD(List(1,2,3,4,5,6),2)
    val rdd2=sc.makeRDD(List(3,4,5,6),2)


    val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))
    sc.stop()
  }

}

RDD-转换算子-partitionBy

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark14_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -Key-value类型
    val rdd=sc.makeRDD(List(1,2,3,4))
    val mapRDD:RDD[(Int,Int)]=rdd.map((_,1))
    //RDD=>PairRDDFunctions
    //隐式转换(二次编译)
    //partitionBy根据指定的分区规则进行重分区
    mapRDD.partitionBy(new HashPartitioner(2))
        .saveAsTextFile("output")


    sc.stop()
  }

}

RDD-转换算子-partitionBy-思考的问题

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark14_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -Key-value类型
    val rdd=sc.makeRDD(List(1,2,3,4))
    val mapRDD:RDD[(Int,Int)]=rdd.map((_,1))
    //RDD=>PairRDDFunctions
    //隐式转换(二次编译)
    //partitionBy根据指定的分区规则进行重分区
    val newRDD=mapRDD.partitionBy(new HashPartitioner(2))
    newRDD.partitionBy(new HashPartitioner(2))
        newRDD.saveAsTextFile("output")


    sc.stop()
  }

}

RDD-转换算子-reduceByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark15_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -reduceByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("a",3),("b",4)
    ))
    //reduceByKey:相同的Key的数据进行value数据的聚合操作
    //scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
    //[1,2,3]
    //[3,3]
    //[6]
    //reduceByKey中如果key的数据只有一个,是不会参与运算的
    val reduceRDD=rdd.reduceByKey((x:Int,y:Int)=>{
      println(s"x=${x},y=${y}")
      x+y
    })
    reduceRDD.collect().foreach(println)

    sc.stop()
  }

}

RDD-算子

RDD-转换算子-groupByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark16_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -groupByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("a",3),("b",4)
    ))
    //groupRDD:将数据源中的数据,相同的key的数据分在一个组中,形成一个对偶元组
    //元组中的第一个元素就是key
    //元组中的第二个元素就是相同key的value的集合
    val groupRDD:RDD[(String,Iterable[Int])]=rdd.groupByKey()
    groupRDD.collect().foreach(println)
    val groupRDD1:RDD[(String,Iterable[(String,Int)])]=rdd.groupBy(_._1)

    sc.stop()
  }

}

RDD-算子

RDD-转换算子-aggregateByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -aggregateByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("a",3),("a",4)
    ),2)
    //(a,[1,2]),(a,[3,4])
    //(a,2),(a,4)
    //(a,6)
    //aggregateByKey存在函数柯里化,有两个参数列表
    //第一个参数列表 需要传递一个参数,表示为初始值
    //      主要用于当碰见第一个key的时候和value进行分区内计算
    //第二个参数列表需要传递两个参数
    //   第一个参数表示内计算规则
    //   第二个参数表示分区计算规则
    rdd.aggregateByKey(0)(
      (x,y)=>math.max(x,y),
      (x,y)=>x+y
    ).collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-aggregateByKey-小练习

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark18_RDD_Operator_Transform3 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -aggregateByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("b",3),
      ("b",4),("b",5),("a",6)
    ),2)

    //aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
//    rdd.foldByKey(0)(_+_).collect().foreach(println)
    //获取相同key的数据的平均值=>(a,3),(b,4)
    val newRDD:RDD[(String,(Int,Int))]=rdd.aggregate((0,0))(
       (t, v) => {
        (t._1 + v, t._2 + 1)
      },
      combOp = (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    val resultRDD:RDD[(String,Int)]=newRDD.mapValues{
      case(num,cnt)=>{
        num/cnt
      }
    }
    resultRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-转换算子-foldByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark17_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -foldByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("b",3),
      ("b",4),("b",5),("a",6)
    ),2)

//    rdd.aggregateByKey(10)(
//      _+_,
//      _+_
//    ).collect().foreach(println)
    //如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
    rdd.foldByKey(0)(_+_).collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-combineByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark19_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -combineByKey
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("b",3),
      ("b",4),("b",5),("a",6)
    ),2)
    //combineByKey:方法需要三个参数
    //第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
    //第二个参数表示:分区内的计算规则
    //第三个参数表示:分区内的计算规则
    val newRDD:RDD[(String,(Int,Int))]=rdd.combineByKey(
      v=>(v,1),
      (t:(Int,Int),v)=>{
        (t._1+v,t._2+1)
      },
      (t1:(Int,Int),t2:(Int,Int))=>{
        (t1._1+t2._1,t1._2+t2._2)
      }
    )
    val resultRDD:RDD[(String,Int)]=newRDD.mapValues{
            case(num,cnt)=>{
              num/cnt
            }
          }
          resultRDD.collect().foreach(println)

    sc.stop()
  }

}

RDD-算子

RDD-转换算子-聚合算子的区别

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark20_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -聚合算子的区别
    val rdd=sc.makeRDD(List(
      ("a",1),("a",2),("b",3),
      ("b",4),("b",5),("a",6)
    ),2)

    rdd.reduceByKey(_+_)//workcount
    rdd.aggregateByKey(0)(_+_,_+_)//workcount
    rdd.foldByKey(0)(_+_)//workcount
    rdd.combineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y)//workcount
    sc.stop()
  }

}

RDD-转换算子-join

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark21_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -join
    val rdd1=sc.makeRDD(List(
      ("a",1),("b",2),("c",3)
    ))
    val rdd2=sc.makeRDD(List(
      ("a",4),("b",5),("c",6)
    ))
    //join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    //如果两个数据源中key没有匹配上,那么数据不会出现在结果中
    //如果两个数据源中key有多个相同的,会一次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低
    val joinRDD=rdd1.join(rdd2)
    joinRDD.collect().foreach(println)


    sc.stop()
  }

}

RDD-转换算子-leftOuterJoin&rightOuterJoin

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark22_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -leftOuterJoin&rightOuterJoin
    val rdd1=sc.makeRDD(List(
      ("a",1),("b",2),("c",3)
    ))
    val rdd2=sc.makeRDD(List(
      ("a",4),("b",5)//,("c",6)
    ))
    val leftJoinRDD=rdd1.leftOuterJoin(rdd2)
    val rightJoinRDD=rdd1.rightOuterJoin(rdd2)
    rightJoinRDD.collect().foreach(println)


    sc.stop()
  }

}

RDD-转换算子-cogroup

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark23_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -join
    val rdd1=sc.makeRDD(List(
      ("a",1),("b",2)//,("c",3)
    ))
    val rdd2=sc.makeRDD(List(
      ("a",4),("b",5),("c",6),("c",7)
    ))
    //cogroup:connect+group(分组,连接)
    val cgRDD=rdd1.cogroup(rdd2)
    cgRDD.collect().foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-转换算子-案例实操

需求
1.数据结构:时间戳,省份,城市,用户,广告,字段使用空格分割。
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
2.需求: 统计出每一个省份广告被点击次数的 TOP3
具体实现

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark24_RDD_Req {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)

    //TODO 算子 -案例实操
    //1.获取原始数据:时间戳,省份,城市,用户,广告
    val dataRDD=sc.textFile("datas/agent.log")
    //2.将原始数据进行结构的转换。方便统计
    //时间戳,省份,城市,用户,广告
    //=>
    //((省份,广告),1)
    val mapRDD=dataRDD.map(
      line=>{
        val datas=line.split(" ")
        (datas(1),datas(4),1)
      }
    )
    //3.将将转换结构后的数据,进行分组聚合
    //((省份,广告),1)=>((省份,广告),sum)
    val reduceRDD:RDD[((String,String),Int)]=mapRDD.reduceByKey()
    //4.将聚合的结果进行结构的转换
    //((省份,广告),sum)=>(省份,(广告,sum))
    val newMapRDD=reduceRDD.map{
      case((prv,ad),sum)=>{
        (prv,(ad,sum))
      }
    }
    //5.将转换结构后的数据根据省份进行分组
    //(省份[(广告A,sumA),(广告B,sumB)])
    val groupRDD=newMapRDD.groupByKey()
    //6.将分组后的数据组内排序(降序),取前三
    val resultRDD=groupRDD.mapValues(
      iter=>{
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )
    //7.采集数据,打印在控制台
    reduceRDD.collect().foreach(println)
    sc.stop()

  }

}

RDD-行动算子

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4))
    //TODO 行动算子
    //所谓的行动算子,其实就是触发作业(Job)执行的方法
    //底层代码调用的是环境对象的runJob方法
    //底层代码中会创建Active,并提交执行
    rdd.collect()
    sc.stop()
  }

}

RDD-行动算子-算子演示

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4))
    //TODO 行动算子
    //reduce
//    val i:Int=rdd.reduce(_+_)
//    println(i)
    //collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
//    val ints:Array[Int]=rdd.collect()
//    println(ints.mkString(","))
    //count:数据源中数据的个数
    val cnt=rdd.count()
    println(cnt)
    //first:获取数据源中数据的第一个
    val first=rdd.first()
    println(first)
    //take:获取N个数据
    val ints=rdd.take(3)
    println(ints.mkString(","))
    //takeOrdered:数据排序后,取n个数据
    val rdd1=sc.makeRDD(List(4,3,2,1))
    val ints1=rdd1.takeOrdered(3)
    println(ints1.mkString(","))
    sc.stop()
  }

}

RDD-行动算子-aggregate

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    //TODO 行动算子
    //aggregateByKey:初始值只会参与分区内计算
    //aggregate:初始值会参与分区内计算,并且会参与分区间计算
    val result=rdd.aggregate(0)(_+_,_+_)
    println(result)
    sc.stop()
  }

}

RDD-行动算子-countByValue&countByKey

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    val rdd1=sc.makeRDD(List(
      ("a",1),("a,2"),("a",3)
    ))
    //TODO 行动算子
    val intToLong=rdd.countByValue()
    println(intToLong)
    val stringToLong= rdd1.countByKey()
    println(stringToLong)
    sc.stop()
  }

}

RDD-行动算子-save的方法

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4),2)
    val rdd1=sc.makeRDD(List(
      ("a",1),("a,2"),("a",3)
    ))
    //TODO 行动算子
    rdd1.saveAsTextFile("output")
    rdd1.saveAsObjectFile("output1")
    sc.stop()
  }

}

RDD-行动算子-foreach

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4))

    //TODO 行动算子
    //foreach 其实是Driver端内存集合的循环遍历方法
    rdd.collect().foreach(println)
    println("**************")
    //foreach 其实是Executor端内存数据打印
    rdd.foreach(println)
    sc.stop()
  }

}

RDD-算子

RDD-序列化-闭包检测

package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc=new SparkContext(sparkConf)
    val rdd=sc.makeRDD(List(1,2,3,4))

    val user=new User()
    //SparkException: Task not serializable
    //NotSerializableException: com.atguigu.bigdata.sparkcore.wc.rdd.operator.action.Spark07_RDD_Operator_Action$User
   //RDD算子中传递的函数是会包含闭包操作,那么会进行检测功能
    //闭包检测
    rdd.foreach(
      num=>{
        println("age="+(user.age+num))
      }
    )
    sc.stop()
  }
//  class User extends Serializable {
  //样例类在编译时,会自动混入序列化特质,(实现可序列号接口)
  case class User(){
    var age:Int=30
  }
}

RDD-WordCount不同的实现方式

package com.atguigu.bigdata.sparkcore.wc.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark03_WordCount1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    wordcount9(sc)
    sc.stop()
  }


  //groupBy
  def wordcount1(sc:SparkContext)={
    val rdd=sc.makeRDD(List("Hello Scala","Hello,Spark"))
    val words=rdd.flatMap(_.split(" "))
    val group=words.groupBy(word=>word)
    val wordCount=group.mapValues(iter=>iter.size)
  }
  //groupByKey
  def wordcount2(sc:SparkContext)= {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val group = wordOne.groupByKey()
    val wordCount = group.mapValues(iter => iter.size)
  }
    //reduceByKey
    def wordcount3(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordOne = words.map((_, 1))
      val wordCount = wordOne.reduceByKey(_ + _)
    }
    //aggregateByKey
    def wordcount4(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordOne = words.map((_, 1))
      val wordCount = wordOne.aggregateByKey(0)(_+_,_+_)
    }
    //foldByKey
    def wordcount5(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordOne = words.map((_, 1))
      val wordCount = wordOne.foldByKey(0)(_+_)
    }
    //combineByKey
    def wordcount6(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordOne = words.map((_, 1))
      val wordCount = wordOne.combineByKey(
        v=>v,
        (x:Int,y)=>x+y,
        (x:Int,y:Int)=>x+y
      )
    }
    //countByKey
    def wordcount7(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordOne = words.map((_, 1))
      val stringToLong=wordOne.countByKey()
    }
    //countByValue
    def wordcount8(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      val wordcount=words.countByValue()
    }
    //reduce aggregate fold
    def wordcount9(sc:SparkContext)= {
      val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
      val words = rdd.flatMap(_.split(" "))
      //[(word,count),(word,count)]
      //word=>Map[(word,1)]
      val mapWord=words.map(
        word=>{
          mutable.Map[String,Long]((word,1))
        }
      )
      val wordCount=mapWord.reduce(
        (map1,map2)=>{
          map2.foreach{
            case (word,count)=>{
              val newCount=map1.getOrElse(word,0L)+count
              map1.update(word,newCount)
            }
          }
          map1
        }
      )
      println(wordCount)
    }

}

RDD-算子

上一篇:(四)Spark常用数据准备,重分布,持久化算子


下一篇:Spark图解