Spark中术语解释
Application:基于Spark的应用程序,包含了driver程序和 集群上的executor
DriverProgram:运行main函数并且新建SparkContext的程序
ClusterManager:在集群上获取资源的外部服务(例如 standalone,Mesos,Yarn )
WorkerNode:集群中任何可以运行应用用代码的节点
Executor:是在一个workernode上为某应用用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用用都有各自自独立的executors
Task:被送到某个executor上的执行单元
累加器
在Driver端定义:sc.longAccumulator
在算子内部进行累加
在Driver端汇总
累加器支持在所有不同节点之间进行累加计算
广播变量
在Driver端广播:sc.broadcast()
在算子内部取用,不能进行修改
广播到每个Executor中
用完记得“销毁”
如果直接将数据封装task中,会产生很多副本,增加网络传输的数据量,降低效率,因为task的数量远大于Executor的数量
import java.lang
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Demo18ShareVariable {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(" ").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("spark/data/words2.txt")
//需求:
// 以词频统计WordCount程序为例,处理的数据word2.txt所示,包括非单词符号,
// 做WordCount的同时统计出特殊字符的数量
//创建一个计数器/累加器
val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
//定义一个特殊字符集合
val ruleList: List[String] = List(",", ".", "!", "#", "$", "%", "(", ")")
//将集合作为广播变量广播到各个节点
val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
//TODO 2.transformation
val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
.flatMap(_.split("\\s+"))
.filter(ch => {
//获取广播数据
val list: List[String] = broadcast.value
if (list.contains(ch)) { //如果是特殊字符
mycounter.add(1)
false
} else { //是单词
true
}
}).map((_, 1))
.reduceByKey(_ + _)
//TODO 3.sink/输出
wordcountResult.foreach(println)
val chResult: lang.Long = mycounter.value //特殊字符数量
println("特殊字符的数量:"+chResult)
}
}