Spark
- RDD-转换算子
- RDD-转换算子-map
- RDD-转换算子-mapPartitions
- RDD-转换算子-mapPartitions-小练习
- RDD-转换算子-mapPartitionsWithIndex
- RDD-转换算子-flatMap
- RDD-转换算子-glom
- RDD-转换算子-理解分区不变的含义
- RDD-转换算子-groupBy
- RDD-转换算子-filter
- RDD-转换算子-sample
- RDD-转换算子-distinct
- RDD-转换算子-coalesce
- RDD-转换算子-repartition
- RDD-转换算子-sortBy
- RDD-转换算子-交集&并集&差集&拉链
- RDD-转换算子-partitionBy
- RDD-转换算子-reduceByKey
- RDD-转换算子-groupByKey
- RDD-转换算子-aggregateByKey
- RDD-转换算子-foldByKey
- RDD-转换算子-combineByKey
- RDD-转换算子-聚合算子的区别
- RDD-转换算子-join
- RDD-转换算子-leftOuterJoin&rightOuterJoin
- RDD-转换算子-cogroup
- RDD-转换算子-案例实操
- RDD-行动算子
- RDD-WordCount不同的实现方式
RDD-转换算子
RDD-转换算子-map
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -map
val rdd =sc.makeRDD(List(1,2,3,4))
//1,2,3,4
//2,4,6,8
//转换函数
def mapFunction(num:Int):Int={
num * 2
}
// val mapRDD:RDD[Int]=rdd.map(mapFunction)
// val mapRDD:RDD[Int]=rdd.map((num:Int)=>{num*2})
// val mapRDD:RDD[Int]=rdd.map((num:Int)=>num*2)
// val mapRDD:RDD[Int]=rdd.map((num)=>num*2)
// val mapRDD:RDD[Int]=rdd.map(num=>num*2)
val mapRDD:RDD[Int]=rdd.map(_*2)
mapRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-map-并行计算效果演示
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Par {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -map
//1.rdd的计算一个分区内的数据是一个一个执行逻辑
//只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
//分区内数据的执行是有序的
val rdd =sc.makeRDD(List(1,2,3,4),1)
val mapRDD=rdd.map(
num=>{
println(">>>>>>>>>>"+num)
num
}
)
val mapRDD1=rdd.map(
num=>{
println("##########"+num)
num
}
)
mapRDD.collect()
mapRDD1.collect()
sc.stop()
}
}
RDD-转换算子-mapPartitions
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -mapPartitions
val rdd=sc.makeRDD(List(1,2,3,4),2)
//mapPartitions:可以以分区为单位进行数据转换操作
//但是会将整个分区的数据加载到内存进行引用
//如果处理完的数据是不会被释放掉,存在对象的引用
//在内存较小,数据量较大的场合下,容易出现溢出
val mpRDD:RDD[Int]=rdd.mapPartitions(
iter=>{
println(">>>>>>>>>>>>>>")
iter.map(_*2)
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-mapPartitions-小练习
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -mapPartitions
val rdd=sc.makeRDD(List(1,2,3,4),2)
//[1,2],[3,4]
//[2],[4]
val mpRDD=rdd.mapPartitions(
iter=>{
List(iter.max).iterator
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-mapPartitions&map的区别
1.map():每次处理一条数据。
2.mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。
3.开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。
RDD-转换算子-mapPartitionsWithIndex
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -mapPartitionsWithIndex
val rdd=sc.makeRDD(List(1,2,3,4),2)
//[1,2],[3,4]
//保留第二个分区的数据
val mpiRDD=rdd.mapPartitionsWithIndex(
(index,iter)=>{
if(index==1){
iter
}else{
Nil.iterator
}
}
)
mpiRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -mapPartitionsWithIndex
//列出每个数据的分区
val rdd=sc.makeRDD(List(1,2,3,4))
val mpiRDD=rdd.mapPartitionsWithIndex(
(index,iter)=>{
//1,2,3,4
//(0,1)(2,2),(4,3),(6,4)
iter.map(
num=>{
(index,num)
}
)
}
)
mpiRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-flatMap
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -flatMap
val rdd:RDD[List[Int]]=sc.makeRDD(List(
List(1,2),List(3,4)
))
val flatRDD:RDD[Int]=rdd.flatMap(
list=>{
list
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -flatMap
val rdd:RDD[String]=sc.makeRDD(List(
"Hello Scala","Hello Spark"
))
val flatRDD:RDD[String]=rdd.flatMap(
s=>{
s.split(" ")
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-flatMap-小练习
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -flatMap
val rdd=sc.makeRDD(List(
List(1,2),3,List(4,5)
))
val flatRDD=rdd.flatMap(
data=>{
data match{
case list:List[_]=>list
case dat=>List(dat)
}
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-glom
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -glom
val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
//List=>Int
//Int=>Array
val glomRDD:RDD[Array[Int]]=rdd.glom()
glomRDD.collect().foreach(data=>println(data.mkString(",")))
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -glom
val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
//[1,2],[3,4]
//[2],[4]取各分区最大值
//[6]求和
val glomRDD:RDD[Array[Int]]=rdd.glom()
val maxRDD:RDD[Int]=glomRDD.map(
array=>{
array.max
}
)
println(maxRDD.collect().sum)
sc.stop()
}
}
RDD-转换算子-理解分区不变的含义
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Part {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -map
val rdd=sc.makeRDD(List(1,2,3,4),2)
//[1,2],[3,4]
rdd.saveAsTextFile("output")
val mapRDD=rdd.map(_*2)
//[2,4],[6,8]
mapRDD.saveAsTextFile("output1")
sc.stop()
}
}
RDD-转换算子-groupBy
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -groupBy
val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4),2)
//groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
//相同的key值的数据会放置在一个组中
def groupFunction(num:Int):Unit={
num % 2
}
val groupRDD=rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -groupBy
val rdd=sc.makeRDD(List("Hello","Spark","Hello","Hadoop"),2)
//分组和分区没有必然的关系
val groupRDD=rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-groupBy-小练习
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -groupBy
val rdd=sc.textFile("datas/apache.log")
val timeRDD=rdd.map(
line=>{
val datas=line.split(" ")
val time=datas(3)
//time.substring(0, )
val sdf=new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date:Date=sdf.parse(time)
val sdf1=new SimpleDateFormat("HH")
val hour:String=sdf1.format(date)
(hour,1)
}
).groupBy(_._1)
timeRDD.map {
case (hour, iter) => {
(hour, iter.size)
}
}.collect.foreach(println)
sc.stop()
}
}
RDD-转换算子-filter
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -filter
val rdd:RDD[Int]=sc.makeRDD(List(1,2,3,4))
val filterRDD:RDD[Int]=rdd.filter(num=>num%2!=0)
filterRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -filter
val rdd=sc.textFile("datas/apache.log")
rdd.filter(
line=>{
val datas=line.split(" ")
val time=datas(3)
time.startsWith("17/05/2015")
}
).collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-sample
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -sample
val rdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
//sample算子需要传递三个参数
//1.第一个参数表示,抽取数据后是否将数据放回 true (放回),false(丢弃)
//2.第二个参数表示,
// 如果抽取不放回的场合:数据源中每条数据被抽取的概率
// 如果抽取放回的场合:表示数据源中每条被抽取的可能次数
//基准值概念
//3.第三个参数表示,抽取数据时随机算法的种子
//如果不传递第三个参数,那么使用的是当前系统时间
// rdd.sample(
// false,
// 0.4
1
// ).collect().mkString(",")
rdd.sample(
true,
2
// 1
).collect().mkString(",")
sc.stop()
}
}
RDD-转换算子-distinct
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -distinct
val rdd=sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1:RDD[Int]=rdd.distinct()
rdd1.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-coalesce
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -coalesce
val rdd=sc.makeRDD(List(1,2,3,4,5,6),3)
//coalesce方法默认情况下不会将分区的数据打乱重新组合
//这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
//如果想要让数据均衡,可以进行shuffle处理
// val newRDD=rdd.coalesce(2)
val newRDD=rdd.coalesce(2,true)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
RDD-转换算子-repartition
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -repartition
val rdd=sc.makeRDD(List(1,2,3,4,5,6),2)
//coalesce算子可以扩大分区,但是如果不进行shuffle操作,是没有意义,不起作用
//所以如果想要实现扩大分区的效果,需要使用shuffle操作
//spark提供了一个简化的操作
//缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
//扩大分区:repartition 底层代码调用的就是coalesce,而且肯定采用shuffle
// val newRDD=rdd.coalesce(3,true)
val newRDD=rdd.repartition(3)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
RDD-转换算子-sortBy
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -sortBy
val rdd=sc.makeRDD(List(6,2,4,5,3,1),2)
val newRDD:RDD[Int]=rdd.sortBy(num=>num)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark12_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -sortBy
val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)),2)
//sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
//sortBy默认情况下不会改变分区,但是中间存在shuffle操作
val newRDD=rdd.sortBy(t=>t._1.toInt,false)
newRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-交集&并集&差集&拉链
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -sortBy双value类型
val rdd1=sc.makeRDD(List(1,2,3,4))
val rdd2=sc.makeRDD(List(3,4,5,6))
//交集 [3,4]
val rdd3:RDD[Int]=rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
//并集 [1,2,3,4,5,6]
val rdd4:RDD[Int]=rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
//差集 [1,2]
val rdd5:RDD[Int]=rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
//拉链 [1-3],[2-4],[3-5],[4-6]
val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
RDD-转换算子-交集&并集&差集&拉链-注意事项
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -sortBy双value类型
//交集,并集和差集要求两个数据源类型保持一致
//拉链操作两个数据源的类型可以不一致
val rdd1=sc.makeRDD(List(1,2,3,4))
val rdd2=sc.makeRDD(List(3,4,5,6))
val rdd7=sc.makeRDD(List("3","4","5","6"))
//交集 [3,4]
val rdd3:RDD[Int]=rdd1.intersection(rdd2)
// val rdd8:RDD[Int]=rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
//并集 [1,2,3,4,5,6]
val rdd4:RDD[Int]=rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
//差集 [1,2]
val rdd5:RDD[Int]=rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
//拉链 [1-3],[2-4],[3-5],[4-6]
val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
// val rdd8:RDD[(Int,Int)]=rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -转换算子-双value类型
//Can't zip RDDs with unequal numbers of partitions: List(2, 4)
//两个数据源要求分区数量保持一致
// val rdd1=sc.makeRDD(List(1,2,3,4),2)
// val rdd2=sc.makeRDD(List(3,4,5,6),4)
//Can only zip RDDs with same number of elements in each partition
//两个数据源要求分区中数据数据数量保持一致
val rdd1=sc.makeRDD(List(1,2,3,4,5,6),2)
val rdd2=sc.makeRDD(List(3,4,5,6),2)
val rdd6:RDD[(Int,Int)]=rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
RDD-转换算子-partitionBy
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -Key-value类型
val rdd=sc.makeRDD(List(1,2,3,4))
val mapRDD:RDD[(Int,Int)]=rdd.map((_,1))
//RDD=>PairRDDFunctions
//隐式转换(二次编译)
//partitionBy根据指定的分区规则进行重分区
mapRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
sc.stop()
}
}
RDD-转换算子-partitionBy-思考的问题
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -Key-value类型
val rdd=sc.makeRDD(List(1,2,3,4))
val mapRDD:RDD[(Int,Int)]=rdd.map((_,1))
//RDD=>PairRDDFunctions
//隐式转换(二次编译)
//partitionBy根据指定的分区规则进行重分区
val newRDD=mapRDD.partitionBy(new HashPartitioner(2))
newRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")
sc.stop()
}
}
RDD-转换算子-reduceByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark15_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -reduceByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("a",3),("b",4)
))
//reduceByKey:相同的Key的数据进行value数据的聚合操作
//scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
//[1,2,3]
//[3,3]
//[6]
//reduceByKey中如果key的数据只有一个,是不会参与运算的
val reduceRDD=rdd.reduceByKey((x:Int,y:Int)=>{
println(s"x=${x},y=${y}")
x+y
})
reduceRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-groupByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark16_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -groupByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("a",3),("b",4)
))
//groupRDD:将数据源中的数据,相同的key的数据分在一个组中,形成一个对偶元组
//元组中的第一个元素就是key
//元组中的第二个元素就是相同key的value的集合
val groupRDD:RDD[(String,Iterable[Int])]=rdd.groupByKey()
groupRDD.collect().foreach(println)
val groupRDD1:RDD[(String,Iterable[(String,Int)])]=rdd.groupBy(_._1)
sc.stop()
}
}
RDD-转换算子-aggregateByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -aggregateByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("a",3),("a",4)
),2)
//(a,[1,2]),(a,[3,4])
//(a,2),(a,4)
//(a,6)
//aggregateByKey存在函数柯里化,有两个参数列表
//第一个参数列表 需要传递一个参数,表示为初始值
// 主要用于当碰见第一个key的时候和value进行分区内计算
//第二个参数列表需要传递两个参数
// 第一个参数表示内计算规则
// 第二个参数表示分区计算规则
rdd.aggregateByKey(0)(
(x,y)=>math.max(x,y),
(x,y)=>x+y
).collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-aggregateByKey-小练习
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark18_RDD_Operator_Transform3 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -aggregateByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),2)
//aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
// rdd.foldByKey(0)(_+_).collect().foreach(println)
//获取相同key的数据的平均值=>(a,3),(b,4)
val newRDD:RDD[(String,(Int,Int))]=rdd.aggregate((0,0))(
(t, v) => {
(t._1 + v, t._2 + 1)
},
combOp = (t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD:RDD[(String,Int)]=newRDD.mapValues{
case(num,cnt)=>{
num/cnt
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-foldByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -foldByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),2)
// rdd.aggregateByKey(10)(
// _+_,
// _+_
// ).collect().foreach(println)
//如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
rdd.foldByKey(0)(_+_).collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-combineByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -combineByKey
val rdd=sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),2)
//combineByKey:方法需要三个参数
//第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
//第二个参数表示:分区内的计算规则
//第三个参数表示:分区内的计算规则
val newRDD:RDD[(String,(Int,Int))]=rdd.combineByKey(
v=>(v,1),
(t:(Int,Int),v)=>{
(t._1+v,t._2+1)
},
(t1:(Int,Int),t2:(Int,Int))=>{
(t1._1+t2._1,t1._2+t2._2)
}
)
val resultRDD:RDD[(String,Int)]=newRDD.mapValues{
case(num,cnt)=>{
num/cnt
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-聚合算子的区别
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark20_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -聚合算子的区别
val rdd=sc.makeRDD(List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),2)
rdd.reduceByKey(_+_)//workcount
rdd.aggregateByKey(0)(_+_,_+_)//workcount
rdd.foldByKey(0)(_+_)//workcount
rdd.combineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y)//workcount
sc.stop()
}
}
RDD-转换算子-join
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -join
val rdd1=sc.makeRDD(List(
("a",1),("b",2),("c",3)
))
val rdd2=sc.makeRDD(List(
("a",4),("b",5),("c",6)
))
//join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
//如果两个数据源中key没有匹配上,那么数据不会出现在结果中
//如果两个数据源中key有多个相同的,会一次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低
val joinRDD=rdd1.join(rdd2)
joinRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-leftOuterJoin&rightOuterJoin
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark22_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -leftOuterJoin&rightOuterJoin
val rdd1=sc.makeRDD(List(
("a",1),("b",2),("c",3)
))
val rdd2=sc.makeRDD(List(
("a",4),("b",5)//,("c",6)
))
val leftJoinRDD=rdd1.leftOuterJoin(rdd2)
val rightJoinRDD=rdd1.rightOuterJoin(rdd2)
rightJoinRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-cogroup
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark23_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -join
val rdd1=sc.makeRDD(List(
("a",1),("b",2)//,("c",3)
))
val rdd2=sc.makeRDD(List(
("a",4),("b",5),("c",6),("c",7)
))
//cogroup:connect+group(分组,连接)
val cgRDD=rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
sc.stop()
}
}
RDD-转换算子-案例实操
需求
1.数据结构:时间戳,省份,城市,用户,广告,字段使用空格分割。
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
2.需求: 统计出每一个省份广告被点击次数的 TOP3
具体实现
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark24_RDD_Req {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
//TODO 算子 -案例实操
//1.获取原始数据:时间戳,省份,城市,用户,广告
val dataRDD=sc.textFile("datas/agent.log")
//2.将原始数据进行结构的转换。方便统计
//时间戳,省份,城市,用户,广告
//=>
//((省份,广告),1)
val mapRDD=dataRDD.map(
line=>{
val datas=line.split(" ")
(datas(1),datas(4),1)
}
)
//3.将将转换结构后的数据,进行分组聚合
//((省份,广告),1)=>((省份,广告),sum)
val reduceRDD:RDD[((String,String),Int)]=mapRDD.reduceByKey()
//4.将聚合的结果进行结构的转换
//((省份,广告),sum)=>(省份,(广告,sum))
val newMapRDD=reduceRDD.map{
case((prv,ad),sum)=>{
(prv,(ad,sum))
}
}
//5.将转换结构后的数据根据省份进行分组
//(省份[(广告A,sumA),(广告B,sumB)])
val groupRDD=newMapRDD.groupByKey()
//6.将分组后的数据组内排序(降序),取前三
val resultRDD=groupRDD.mapValues(
iter=>{
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
//7.采集数据,打印在控制台
reduceRDD.collect().foreach(println)
sc.stop()
}
}
RDD-行动算子
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4))
//TODO 行动算子
//所谓的行动算子,其实就是触发作业(Job)执行的方法
//底层代码调用的是环境对象的runJob方法
//底层代码中会创建Active,并提交执行
rdd.collect()
sc.stop()
}
}
RDD-行动算子-算子演示
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4))
//TODO 行动算子
//reduce
// val i:Int=rdd.reduce(_+_)
// println(i)
//collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
// val ints:Array[Int]=rdd.collect()
// println(ints.mkString(","))
//count:数据源中数据的个数
val cnt=rdd.count()
println(cnt)
//first:获取数据源中数据的第一个
val first=rdd.first()
println(first)
//take:获取N个数据
val ints=rdd.take(3)
println(ints.mkString(","))
//takeOrdered:数据排序后,取n个数据
val rdd1=sc.makeRDD(List(4,3,2,1))
val ints1=rdd1.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
RDD-行动算子-aggregate
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4),2)
//TODO 行动算子
//aggregateByKey:初始值只会参与分区内计算
//aggregate:初始值会参与分区内计算,并且会参与分区间计算
val result=rdd.aggregate(0)(_+_,_+_)
println(result)
sc.stop()
}
}
RDD-行动算子-countByValue&countByKey
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4),2)
val rdd1=sc.makeRDD(List(
("a",1),("a,2"),("a",3)
))
//TODO 行动算子
val intToLong=rdd.countByValue()
println(intToLong)
val stringToLong= rdd1.countByKey()
println(stringToLong)
sc.stop()
}
}
RDD-行动算子-save的方法
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4),2)
val rdd1=sc.makeRDD(List(
("a",1),("a,2"),("a",3)
))
//TODO 行动算子
rdd1.saveAsTextFile("output")
rdd1.saveAsObjectFile("output1")
sc.stop()
}
}
RDD-行动算子-foreach
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4))
//TODO 行动算子
//foreach 其实是Driver端内存集合的循环遍历方法
rdd.collect().foreach(println)
println("**************")
//foreach 其实是Executor端内存数据打印
rdd.foreach(println)
sc.stop()
}
}
RDD-序列化-闭包检测
package com.atguigu.bigdata.sparkcore.wc.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc=new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(1,2,3,4))
val user=new User()
//SparkException: Task not serializable
//NotSerializableException: com.atguigu.bigdata.sparkcore.wc.rdd.operator.action.Spark07_RDD_Operator_Action$User
//RDD算子中传递的函数是会包含闭包操作,那么会进行检测功能
//闭包检测
rdd.foreach(
num=>{
println("age="+(user.age+num))
}
)
sc.stop()
}
// class User extends Serializable {
//样例类在编译时,会自动混入序列化特质,(实现可序列号接口)
case class User(){
var age:Int=30
}
}
RDD-WordCount不同的实现方式
package com.atguigu.bigdata.sparkcore.wc.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark03_WordCount1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
wordcount9(sc)
sc.stop()
}
//groupBy
def wordcount1(sc:SparkContext)={
val rdd=sc.makeRDD(List("Hello Scala","Hello,Spark"))
val words=rdd.flatMap(_.split(" "))
val group=words.groupBy(word=>word)
val wordCount=group.mapValues(iter=>iter.size)
}
//groupByKey
def wordcount2(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group = wordOne.groupByKey()
val wordCount = group.mapValues(iter => iter.size)
}
//reduceByKey
def wordcount3(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount = wordOne.reduceByKey(_ + _)
}
//aggregateByKey
def wordcount4(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount = wordOne.aggregateByKey(0)(_+_,_+_)
}
//foldByKey
def wordcount5(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount = wordOne.foldByKey(0)(_+_)
}
//combineByKey
def wordcount6(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount = wordOne.combineByKey(
v=>v,
(x:Int,y)=>x+y,
(x:Int,y:Int)=>x+y
)
}
//countByKey
def wordcount7(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val stringToLong=wordOne.countByKey()
}
//countByValue
def wordcount8(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
val wordcount=words.countByValue()
}
//reduce aggregate fold
def wordcount9(sc:SparkContext)= {
val rdd = sc.makeRDD(List("Hello Scala", "Hello,Spark"))
val words = rdd.flatMap(_.split(" "))
//[(word,count),(word,count)]
//word=>Map[(word,1)]
val mapWord=words.map(
word=>{
mutable.Map[String,Long]((word,1))
}
)
val wordCount=mapWord.reduce(
(map1,map2)=>{
map2.foreach{
case (word,count)=>{
val newCount=map1.getOrElse(word,0L)+count
map1.update(word,newCount)
}
}
map1
}
)
println(wordCount)
}
}