目录
3.6 Action
Action 用来触发RDD的计算,得到相关计算结果;
Action触发Job。一个Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job;
典型的Action算子: collect / count
collect() => sc.runJob() => ... => dagScheduler.runJob() => 触发了Job
要求:能快速准确的区分Transformation、Action
collect() / collectAsMap()
stats / count / mean / stdev / max / min
reduce(func) / fold(func) / aggregate(func)
first():Return the first element in this RDD
take(n):Take the first num elements of the RDD
top(n):按照默认(降序)或者指定的排序规则,返回前num个元素。
takeSample(withReplacement, num, [seed]):返回采样的数据
foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
// 返回统计信息。仅能作用 RDD[Double] 类型上调用
val rdd1 = sc.range(1, 101)
rdd1.stats // 查看rdd1的统计信息, 其中项目可以单独调用
// res14: org.apache.spark.util.StatCounter = (count: 100, mean: 50.500000, stdev: 28.866070, max: 100.000000, min: 1.000000)
val rdd2 = sc.range(1, 101)
// 不能调用
rdd1.zip(rdd2).stats // 多组元素用不了
// <console>:28: error: value stats is not a member of org.apache.spark.rdd.RDD[(Long, Long)]
// count在各种类型的RDD上,均能调用
rdd1.zip(rdd2).count
// res16: Long = 100
// 聚合操作
val rdd = sc.makeRDD(1 to 10, 2)
rdd.reduce(_+_)
// res17: Int = 55
// fold和reduce的区别就是, 可以给一个初值
rdd.fold(0)(_+_) // 可以给初值0
// res18: Int = 55
rdd.fold(1)(_+_) // 给初值1, 结果多3, 详细解释看下面的图
// res19: Int = 58
rdd.fold(1)((x, y) => {
println(s"x=$x, y=$y")
x+y
})
// 详细结果, 参见下面的图 与a+b 和 x+y 一样
rdd.aggregate(0)(_+_, _+_) // 初始值的数据类型要一致才行
// res7: Int = 55
rdd.aggregate(1)(_+_, _+_) // _+_ 局部汇总规则 _+_ 全局汇总规则
// res8: Int = 58
// rdd.getNumPartitions
// res18: Int = 2 分区是2, 所以下面 +3
rdd.aggregate(1)(
(a, b) => {
println(s"a=$a, b=$b")
a+b
},
(x, y) => {
println(s"x=$x, y=$y")
x+y
}
)
// 详细结果, 参见下面的图
====================================================
// first / take(n) / top(n) :获取RDD中的元素。多用于测试
rdd.first
// res10: Int = 1
rdd.take(10) // 返回前10个元素
// res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd.top(10) // 返回最大的前10个
// res12: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
// 采样并返回结果
rdd.takeSample(false, 5)
// res13: Array[Int] = Array(9, 2, 4, 8, 1)
// 保存文件到指定路径(rdd有多少分区,就保存为多少文件)
// 注意小文件问题, 可以使用coalesce缩减分区
rdd.coalesce(1).saveAsTextFile("data/t1")
val rdd = (1 to 10)
rdd: scala.collection.immutable.Range.Inclusive = Range 1 to 10
// foreach 没有返回值, 需要借助其他函数, 把结果发到其他地方
rdd.foreach(x => x + "")
// 无返回值
rdd.foreach(x => print(x + " "))
// 1 2 3 4 5 6 7 8 9 10
在使用 fold(1) 之后, 结果增加了3的原因: 有几个分区就会局部汇总几次, 另外还有全局汇总, 每次汇总都会 + 初始值
3.7 Key-Value RDD操作
RDD整体上分为 Value 类型和 Key-Value 类型。
前面介绍的是 Value 类型的RDD的操作,实际使用更多的是 key-value 类型的RDD,也称为 PairRDD。
Value 类型RDD的操作基本集中在 RDD.scala 中;
key-value 类型的RDD操作集中在 PairRDDFunctions.scala 中;
前面介绍的大多数算子对 Pair RDD 都是有效的。Pair RDD还有属于自己的 Transformation、Action 算子;
3.7.1 创建Pair RDD
val arr = (1 to 10).toArray
val arr1 = arr.map(x => (x, x*10, x*100))
// rdd1 不是 Pair RDD
val rdd1 = sc.makeRDD(arr1)
rdd1.take(3)
// res1: Array[(Int, Int, Int)] = Array((1,10,100), (2,20,200), (3,30,300))
// rdd2 是 Pair RDD
val arr2 = arr.map(x => (x, (x*10, x*100)))
val rdd2 = sc.makeRDD(arr2)
rdd2.take(3)
// res2: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)))
rdd1.map(x => (x._1, (x._2, x._3))).collectAsMap // 这种方式可以转换成map
// res5: scala.collection.Map[Int,(Int, Int)] = Map(8 -> (80,800), 2 -> (20,200), 5 -> (50,500), 4 -> (40,400), 7 -> (70,700), 10 -> (100,1000), 1 -> (10,100), 9 -> (90,900), 3 -> (30,300), 6 -> (60,600))
3.7.2 Transformation操作
1、类似 map 操作
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
a.collect
// res6: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
// 使用 mapValues 更简洁
val b = a.mapValues(x=>1 to x) // 对每个value执行, value => 1 to value
b.collect
// Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
// 可使用map实现同样的操作
val c = a.map(x => (x._1, 1 to x._2))
c.collect
// Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
val c = a.map{case (k, v) => (k, 1 to v)} // 偏函数方式
c.collect
// Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
// flatMapValues 将 value 的值压平
val c = a.flatMapValues(x=>1 to x)
c.collect
// res10: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
val c = a.mapValues(x=>1 to x)
// Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
val c = a.mapValues(x=>1 to x).flatMap{case (k, v) => v.map(x => (k, x))} // 使用偏函数, 计算 v. 丢掉 k
c.collect
// res10: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
c.keys
// res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at keys at <console>:26
c.values
// res12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at values at <console>:26
c.map{case (k, v) => k}.collect // k留下, v丢掉
// res16: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
c.map{case (k, _) => k}.collect // k留下, 其他丢掉
// res17: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
c.map{case (_, v) => v}.collect // v留下, k丢掉
// res18: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
2、聚合操作【重要、难点】
PariRDD(k, v) 使用范围广,聚合
groupByKey / reduceByKey / foldByKey / aggregateByKey
combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底层实现, 版本区别
subtractByKey:类似于subtract,删掉 RDD 中键与 other RDD 中的键相同的元素
小案例:给定一组数据:("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16), 键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15),
("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark",16)))
rdd.groupByKey().collect // 下面是scala的数据结构
// Array((scala,CompactBuffer(26, 24)), (hadoop,CompactBuffer(26, 23, 16)), (spark,CompactBuffer(12, 25, 15, 16, 23)))
// groupByKey
rdd.groupByKey().map(x=>(x._1, x._2.sum.toDouble/x._2.size)).collect // groupByKey不加括号也能执行
// Array((scala,25.0), (hadoop,21.666666666666668), (spark,18.2))
rdd.groupByKey().map{case (k, v) => (k, v.sum * 1.0/v.size)}.collect // 可读性更强
// 同上
rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect
// 同上
// reduceByKey
rdd.mapValues((_, 1)). // Array((spark,(12,1)), (hadoop,(26,1)) ... )
reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)). // Array((scala,(50,2)), (hadoop,(65,3)), (spark,(91,5)))
mapValues(x => (x._1.toDouble / x._2)).
collect()
// Array((scala,25.0), (hadoop,21.666666666666668), (spark,18.2))
// foldByKey, 与上面类似, 最大的区别是可以定义初值
rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {
(x._1+y._1, x._2+y._2)
}).mapValues(x=>x._1.toDouble/x._2).collect
// aggregateByKey
// aggregateByKey => 定义初值 + 分区内的聚合函数 + 分区间的聚合函数
rdd.mapValues((_, 1)).aggregateByKey((0,0))(
(x, y) => (x._1 + y._1, x._2 + y._2),
(a, b) => (a._1 + b._1, a._2 + b._2)
).mapValues(x=>x._1.toDouble / x._2).collect
// 初值(元组)与RDD元素类型(Int)可以不一致
rdd.aggregateByKey((0, 0))(
(x, y) => {println(s"x=$x, y=$y"); (x._1 + y, x._2 + 1)}, // x,y分别是前后的一个数, 这里的x类型被限定为 (_1, _2), 于是y传进来被加到 x._1上, x._2自增加1
(a, b) => {println(s"a=$a, b=$b"); (a._1 + b._1, a._2 + b._2)} // 这里的 a,b 分别是前后数, 格式都是 (_1, _2)
).mapValues(x=>x._1.toDouble/x._2).collect
// 分区内的合并与分区间的合并,可以采用不同的方式;这种方式是低效的!
rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())( // 需要导包
(x, y) => {x.append(y); x}, // 这里 x 要返回
(a, b) => {a++b}
).mapValues(v => v.sum.toDouble/v.size).collect
// combineByKey(理解就行)
rdd.combineByKey(
(x: Int) => {println(s"x=$x"); (x,1)},
(x: (Int, Int), y: Int) => {println(s"x=$x, y=$y");(x._1+y, x._2+1)},
(a: (Int, Int), b: (Int, Int)) => {println(s"a=$a, b=$b");(a._1+b._1, a._2+b._2)}
).mapValues(x=>x._1.toDouble/x._2).collect
// subtractByKey
val rdd1 = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark",15)))
val rdd2 = sc.makeRDD(Array(("spark", 100), ("hadoop", 300)))
rdd1.subtractByKey(rdd2).collect()
// 相同key消失
// subtractByKey
val rdd = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
val other = sc.makeRDD(Array(("a",10), ("b",20), ("c",30)))
rdd.subtractByKey(other).collect()
// 相同k消失, 和 v 无关
结论:效率相等用最熟悉的方法;groupByKey在一般情况下效率低,尽量少用 (95%以上不用)
初学:最重要的是实现;如果使用了groupByKey,寻找替换的算子实现;
groupByKey Shuffle过程中传输的数据量大,效率低
3、排序操作
sortByKey:sortByKey函数作用于PairRDD,对Key进行排序。在org.apache.spark.rdd.OrderedRDDFunctions 中实现:
val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"))
val b = sc.parallelize (1 to a.count.toInt)
val c = a.zip(b)
// 默认使用升序
c.sortByKey().collect
// Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))
// 使用降序
c.sortByKey(false).collect
// Array((wyp,1), (test,5), (iteblog,2), (com,3), (397090770,4))
4、join操作
cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin
val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
// (6,(CompactBuffer(),CompactBuffer(冯七)))
// (1,(CompactBuffer(Spark),CompactBuffer()))
// (2,(CompactBuffer(Hadoop),CompactBuffer()))
// (3,(CompactBuffer(Kylin),CompactBuffer(李四)))
// (4,(CompactBuffer(Flink),CompactBuffer(王五)))
// (5,(CompactBuffer(),CompactBuffer(赵六)))
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// Array((3,(CompactBuffer(Kylin),CompactBuffer(李四))), (4,(CompactBuffer(Flink),CompactBuffer(王五))))
// 仿照源码实现join操作
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
).collect
// Array((3,(Kylin,李四)), (4,(Flink,王五)))
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
// Array((3,(Scala,20K)), (4,(Java,18K)))
rdd1.leftOuterJoin(rdd2).collect
// Array((1,(Spark,None)), (2,(Hadoop,None)), (3,(Scala,Some(20K))), (4,(Java,Some(18K))))
rdd1.rightOuterJoin(rdd2).collect
// Array((6,(None,10K)), (3,(Some(Scala),20K)), (4,(Some(Java),18K)), (5,(None,25K)))
rdd1.fullOuterJoin(rdd2).collect
// Array((6,(None,Some(10K))), (1,(Some(Spark),None)), (2,(Some(Hadoop),None)), (3,(Some(Scala),Some(20K))), (4,(Some(Java),Some(18K))), (5,(None,Some(25K))))
3.7.3 Action操作
collectAsMap / countByKey / lookup(key)
countByKey源码:
lookup(key):高效的查找方法,只查找对应分区的数据(如果该RDD有分区器的话)
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("1","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.lookup("1")
// res25: Seq[String] = WrappedArray(Spark, Java)
rdd2.lookup("3")
// res26: Seq[String] = WrappedArray(20K)
3.8 输入与输出
3.8.1 文件输入与输出
1、文本文件
数据读取:textFile(String)。可指定单个文件,支持通配符。
这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
返回值RDD[(String, String)],其中 Key是文件的名称,Value是文件的内容
数据保存:saveAsTextFile(String)。指定的输出目录。
2、csv文件
读取 CSV(Comma-Separated Values)/TSV(Tab-Separated Values) 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。
CSV/TSV 数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD,然后使用 Spark 的文本文件 API 写出去。
3、json文件
如果 JSON 文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
JSON数据的输出主要是通过在输出之前将由结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件API 写出去。
json文件的处理使用SparkSQL最为简洁。
4、SequenceFile
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。 Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用:sequenceFile[keyClass, valueClass];
调用 saveAsSequenceFile(path) 保存PairRDD,系统将键和值能够自动转为Writable类型。
5、对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。
通过 objectFile[k,v](path) 接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
3.8.2 JDBC
详见综合案例
3.9 算子综合应用案例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ch</groupId>
<artifactId>spark_code</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.9.2</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 第三方的日期时间包 -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.7</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1、WordCount - scala
备注:打包上传服务器运行
package com.ch.sparkcore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ScalaWordCount {
def main(args: Array[String]): Unit = {
// 1、创建SparkContext, 必须要给地址和应用名称
//val conf = new SparkConf().setMaster("local").setAppName("WordCount")
// 在集群运行,要用下面的设置
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// 2、读本地文件(集群运行:输入参数)
//val lines = sc.textFile("data/wc.txt")
// 集群运行,要用下面的设置
val lines: RDD[String] = sc.textFile(args(0))
// 3、RDD转换
val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))
val wordsMap: RDD[(String, Int)] = words.map(x => (x, 1))
val result: RDD[(String, Int)] = wordsMap.reduceByKey(_ + _)
// 4、输出
result.foreach(println)
// 5、关闭SparkContext
sc.stop()
// 6、打包,使用spark-submit提交集群运行
// spark-submit --master local[*] --class com.ch.sparkcore.ScalaWordCount \
// original-spark_code-1.0-SNAPSHOT.jar /wcinput/*
// spark-submit --master yarn --class com.ch.sparkcore.ScalaWordCount \
// original-spark_code-1.0-SNAPSHOT.jar /wcinput/*
}
}
2、WordCount - java
Spark提供了:Scala、Java、Python、R语言的API;
对 Scala 和 Java 语言的支持最好;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class JavaWordCount {
public static void main(String[] args) {
// 1 创建 JavaSparkContext
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
jsc.setLogLevel("warn");
// 2 生成RDD
JavaRDD<String> lines = jsc.textFile("file:///data\\wc.txt");
// 3 RDD转换
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
JavaPairRDD<String, Integer> wordsMap = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> results = wordsMap.reduceByKey((x, y) -> x + y);
// 4 结果输出
results.foreach(elem -> System.out.println(elem));
// 5 关闭SparkContext
jsc.stop();
}
}
备注:
Spark入口点:JavaSparkContext
Value-RDD:JavaRDD;key-value RDD:JavaPairRDD
JavaRDD 和 JavaPairRDD转换
-
JavaRDD => JavaPairRDD:通过mapToPair函数
-
JavaPairRDD => JavaRDD:通过map函数转换
lambda表达式使用 ->
3、计算圆周率
package com.ch.sparkcore
import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random
object SparkPi {
def main(args: Array[String]): Unit = {
// 1、创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// 如果没有输入分区, 默认分为100个区
val slices = if (args.length > 0) args(0).toInt else 100
val N = 100000000
// 2、生成、转换RDD
// 生成 0~1 内的坐标
val n = sc.makeRDD(1 to N, slices)
.map(idx => {
val (x, y) = (random, random)
if (x*x + y*y <= 1) 1 else 0
}).sum()
// 3、结果输出
val pi = 4.0 * n / N
println(s"pi = $pi")
// 5、关闭SparkContext
sc.stop()
}
}
4、广告数据统计
数据格式:timestamp province city userid adid 时间点 省份 城市 用户 广告
需求: 1、统计每一个省份点击TOP3的广告ID 2、统计每一个省份每一个小时的TOP3广告ID
package com.ch.sparkcore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Adstat {
def main(args: Array[String]): Unit = {
// 1、创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// 获取前N个
val N = 3
// 2、生成RDD
val lines: RDD[String] = sc.textFile("data\\advert.log")
// 数据格式:时间点 省份 城市 用户 广告
// 3、RDD转换
println(" 1、统计每一个省份点击TOP3的广告ID=========================================================")
val stat1RDD: RDD[(String, String)] = lines.map { line =>
val fields: Array[String] = line.split("\\s+")
(fields(1), fields(4))
}
// ((Henan,5),1) ((Henan,5),1) ((Henan,5),1) ....
// 按省份、广告汇总
val reduce1RDD: RDD[((String, String), Int)] =
stat1RDD.map { case (provice, adid) => ((provice, adid), 1) }
.reduceByKey(_ + _) // ((Henan,5),2189) ((Hebei,7),2250) ((Henan,0),2237) ....
// 对以上汇总信息求Top3
reduce1RDD.map{case ((provice, adid), count) => (provice, (adid, count))}
.groupByKey()
.mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(N).map(_._1).mkString(":"))
.foreach(println) // (Hebei,7:8:3) (Hunan,5:1:2) (Hubei,8:6:2) (Henan,6:0:4) (Jiangsu,7:3:6)
println("2、统计每一个省份每一个小时的TOP3广告ID =========================================================")
// 0 1 4 时间点 省份 广告
// 使用field返回需要的数据
lines.map { line =>
val fields: Array[String] = line.split("\\s+")
((getHour(fields(0)), fields(1), fields(4)), 1)
}.reduceByKey(_+_)
// ((0,Hubei,5),2204)
// ((0,Henan,9),2180)
// ((0,Jiangsu,0),2147) ......
lines.map { line =>
val fields: Array[String] = line.split("\\s+")
((getHour(fields(0)), fields(1), fields(4)), 1)
}.reduceByKey(_+_)
.map{case ((hour, provice, adid), count) => ((provice, hour), (adid, count))}
// ((Hubei,0),(5,2204))
// ((Henan,0),(9,2180))
// ((Jiangsu,0),(0,2147))
lines.map { line =>
val fields: Array[String] = line.split("\\s+")
((getHour(fields(0)), fields(1), fields(4)), 1)
}.reduceByKey(_+_)
.map{case ((hour, provice, adid), count) => ((provice, hour), (adid, count))}
.groupByKey()
.mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(N).map(_._1).mkString(":"))
.collect.foreach(println)
// ((Henan,0),6:0:4) ((Hubei,0),8:6:2) ((Hunan,0),5:1:2) ((Jiangsu,0),7:3:6) ((Hebei,0),7:8:3)
// 5、关闭SparkContext
sc.stop()
}
// 下面的功能函数, 必须在外面测试了之后再拿来用
// 在 spark core 程序中一定不要使用java8以前额时间日期类型, 因为线程不安全
// 使用第三方的时间日期类型包, 一定要确认其实线程安全的 !!!!
// 不管知不知道函数的返回值, 最好都要写上返回值
def getHour(str: String): Int = {
import org.joda.time.DateTime
val dt = new DateTime(str.toLong)
dt.getHourOfDay
}
}
在Java 8出现前的很长时间内成为Java中日期时间处理的事实标准,用来弥补JDK的不足。
Joda 类具有不可变性,它们的实例无法被修改。(不可变类的一个优点就是它们是线程安全的)
在 Spark Core 程序中使用时间日期类型时,不要使用 Java 8 以前的时间日期类型,线程不安全。
5、找共同好友
原始数据:
100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100
第一列表示用户,后面的表示该用户的好友
要求:
1、查找两两用户的共同好友
2、最后的结果按前两个id号有序排序
package com.ch.sparkcore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FindFriends {
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("data\\fields.dat")
println("// 方法一:核心思想利用笛卡尔积求两两的好友,然后去除多余的数据")
val friendsRDD: RDD[(String, Array[String])] = lines.map { line =>
val fields: Array[String] = line.split(",")
val userId = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
}
friendsRDD.cartesian(friendsRDD) //.collect().foreach(println)
//((100,[Ljava.lang.String;@285f38f6),(200,[Ljava.lang.String;@3ab6678b))
//((200,[Ljava.lang.String;@2b59501e),(100,[Ljava.lang.String;@476e8796))......
friendsRDD.cartesian(friendsRDD)
.filter { case ((id1, _), (id2, _)) => id1 < id2 }
// .collect().foreach(x => println(x._1._1, x._1._2.toBuffer, x._2._1, x._2._2.toBuffer))
//(100,ArrayBuffer(200, 300, 400, 500, 600),200,ArrayBuffer(100, 300, 400))
//(100,ArrayBuffer(200, 300, 400, 500, 600),300,ArrayBuffer(100, 200, 400, 500))......
friendsRDD.cartesian(friendsRDD)
.filter { case ((id1, _), (id2, _)) => id1 < id2 }
.map { case ((id1, friends1), (id2, friends2)) =>
//((id1, id2), friends1.toSet & friends2.toSet)
((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
}.sortByKey()
// .collect().foreach(println)
//((100,200),ArrayBuffer(300, 400))
//((100,300),ArrayBuffer(200, 400, 500))......
println("// 方法二:消除笛卡尔积,更高效。========================================")
// 核心思想:将数据变形,找到两两的好友, 再执行数据的合并
val value: RDD[(String, Array[String])] = friendsRDD.flatMapValues {
friends => friends.combinations(2)
}
//value.collect.foreach(x => println(x._1, x._2.toBuffer))
//(100,ArrayBuffer(200, 300))
//(100,ArrayBuffer(200, 400))
//(100,ArrayBuffer(200, 500))
friendsRDD.flatMapValues(friends => friends.combinations(2))
.map(x => (x._2.mkString(" & "), x._1))
// .collect.foreach(println)
//(200 & 300,100)
//(200 & 400,100)
//(200 & 500,100).....
println("=====================================================")
friendsRDD.flatMapValues(friends => friends.combinations(2))
// .map(x => (x._2.mkString(" & "), Set(x._1)))
.map { case (k, v) => (v.mkString(" & "), Set(k)) }
.reduceByKey(_ | _)
.sortByKey()
.collect().foreach(println)
//(100 & 200,Set(300, 400))
//(100 & 300,Set(200, 400, 500)).....
// 知识点:flatMapValues / combinations / 数据的变形 / reduceByKey / 集合的操作
// combinations 求的是数组中的排列
// 关闭SparkContext
sc.stop()
}
}
val s1 = (1 to 5).toSet
val s2 = (3 to 8).toSet
// 交。intersect
println(s1 & s2)
// 并。union
println(s1 | s2)
// 差。diff
println(s1 &~ s2)
6、Super WordCount
要求:将单词全部转换为小写,去除标点符号(难),去除停用词(难);最后按照 count 值降序保存到文件,同时将全部结果保存到MySQL(难);标点符号和停用词可以自定义。
停用词:语言中包含很多功能词。与其他词相比,功能词没有什么实际含义。最普遍的功能词是[限定词](the、a、an、that、those),介词(on、in、to、from、over等)、代词、数量词等。
Array[(String, Int)] => scala jdbc => MySQL
package com.ch.sparkcore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SuperWordCount1 {
private val stopWords = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val acc3 = sc.collectionAccumulator[String]("allWords")
// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("data\\swc.dat")
lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length>0)
.map((_, 1))
.reduceByKey(_+_)
.sortBy(_._2, false)
.collect.foreach(println)
// 结果输出
// 关闭SparkContext
sc.stop()
}
}
create table wordcount(word varchar(30), count int);
package com.ch.sparkcore
import java.sql.{Connection, DriverManager, PreparedStatement}
object JDBCDemo {
def main(args: Array[String]): Unit = {
// 定义结果集Array[(String, Int)]
val str = "hadoop spark java scala hbase hive sqoop hue tez atlas datax grinffin zk kafka"
val result: Array[(String, Int)] = str.split("\\s+").zipWithIndex
// 定义参数
val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
val username = "hive"
val password = "12345678"
// jdbc 保存数据
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"
try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
result.foreach{case (k, v) =>
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}
}catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
}
未优化的程序:使用 foreach 保存数据,要创建大量的链接
package com.ch.sparkcore
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SuperWordCount2 {
private val stopWords = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val username = "hive"
private val password = "12345678"
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("data\\swc.dat")
val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length > 0)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
// 结果输出
resultRDD.saveAsTextFile("data\\superwc")
// 使用 foreach,对每条记录创建连接
resultRDD.foreach{case (k, v) =>
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"
try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
// 关闭SparkContext
sc.stop()
}
}
优化后的程序:使用 foreachPartition 保存数据,一个分区创建一个链接;cache RDD
package com.ch.sparkcore
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SuperWordCount3 {
private val stopWords: Array[String] = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val username = "hive"
private val password = "12345678"
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("file:///C:\\Project\\LagouBigData\\data\\swc.dat")
val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length > 0)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
//resultRDD.foreach(println)
// ?????这里是干嘛的?
resultRDD.cache()
// 结果输出
resultRDD.saveAsTextFile("file:///C:\\Project\\LagouBigData\\data\\superwc")
// 使用 foreachPartition,对每条记录创建连接
resultRDD.foreachPartition { iter => saveAsMySQL(iter)}
// 关闭SparkContext
sc.stop()
}
def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"
try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
iter.foreach { case (k, v) =>
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
}
// 在SparkSQL中有内建的访问MySQL的方法,调用非常方便
// SparkCore、SQL不支持的外部存储, 可以使用上述方法
备注:
-
SparkSQL有方便的读写MySQL的方法,给参数直接调用即可;
-
但以上掌握以上方法非常有必要,因为SparkSQL不是支持所有的类型的数据库