idea 新建maven 项目
-
输入maven坐标
-
编辑maven文件
中间层Spark,即核心模块Spark Core,必须在maven中引用。
编译Spark还要声明java8编译工具。
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
idea自动加载引用,在窗口左侧Project导航栏-->External Libraries中看到引用org.apache.spark中spark-core_2.11-2.1.0.jar文件。
注:Spark Streaming是流式计算框架、SparkSQL数据库工具、Mlib机器学习框架、GraphX图计算工具。
Java 8 lambda函数风格的wordCount
//定义单词总数累加器、和停用词累加器
Accumulator countTotal = jsc.accumulator(0);
Accumulator stopTotal = jsc.accumulator(0);
// 文件初始化RDD
JavaRDD<String> stopword = jsc.textFile("data/text/stopword.txt");
JavaRDD<String> rdd = jsc.textFile("data/text/Hamlet.txt");
// RDD 转换为List
List<String> stopWordList = stopword.collect();
// Broadcast 广播变量,task共享executor的变量
Broadcast<List<String>> broadcastedStopWordSet = jsc.broadcast(stopWordList);
rdd.filter(l->l.length()>0)
.flatMap(l-> Arrays.asList(l.trim().split(" ")).iterator())
// 将line分割展成词向量,词向量在连接,返回Rdd<String>
.map(v->v.replaceAll("['.,:;?!-]", "").toLowerCase())
// 特殊字符处理, Rdd<String>
.filter(v->{
boolean isStop = false;
countTotal.add(1);
if(broadcastedStopWordSet.value().contains(v)){
stopTotal.add(1);
isStop = true;
}
return !isStop;
})
//遍历总数计数、停用词计数,过滤停止词, Rdd<String>
.mapToPair(v-> new Tuple2<>(v,1))
.reduceByKey((v1,v2)->v1+v2)
//统计个数
.mapToPair(p-> new Tuple2<>(p._2,p._1))
.sortByKey(false)
//排序
.take(10).forEach(e->{
System.out.println(e._2+":"+e._1);
});
- 将line分割展成词向量,词向量连接,flatmap返回Rdd<String>
- 特殊字符处理,返回 Rdd<String>
- 遍历总数计数、停用词计数,过滤停止词, 返回Rdd<String>
- Reduce Rdd<String,1>,返回Rdd<String,total>
- 排序 SortByKey,返回 Rdd<String,total>