Intellij idea配置Spark开发环境,统计哈姆雷特词频(2)

idea 新建maven 项目

  1. 输入maven坐标


    maven 坐标
  2. 编辑maven文件


    Spark 体系

中间层Spark,即核心模块Spark Core,必须在maven中引用。
编译Spark还要声明java8编译工具。

<properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

idea自动加载引用,在窗口左侧Project导航栏-->External Libraries中看到引用org.apache.spark中spark-core_2.11-2.1.0.jar文件。


idea Externel Libraries

注:Spark Streaming是流式计算框架、SparkSQL数据库工具、Mlib机器学习框架、GraphX图计算工具。

Java 8 lambda函数风格的wordCount

//定义单词总数累加器、和停用词累加器
Accumulator countTotal = jsc.accumulator(0);
Accumulator stopTotal = jsc.accumulator(0);
// 文件初始化RDD
JavaRDD<String> stopword = jsc.textFile("data/text/stopword.txt");
JavaRDD<String> rdd = jsc.textFile("data/text/Hamlet.txt");
// RDD 转换为List
List<String> stopWordList = stopword.collect();
// Broadcast 广播变量,task共享executor的变量
Broadcast<List<String>> broadcastedStopWordSet = jsc.broadcast(stopWordList);

rdd.filter(l->l.length()>0)
        .flatMap(l-> Arrays.asList(l.trim().split(" ")).iterator()) 
        // 将line分割展成词向量,词向量在连接,返回Rdd<String>
        .map(v->v.replaceAll("['.,:;?!-]", "").toLowerCase())
        // 特殊字符处理, Rdd<String>
        .filter(v->{
            boolean isStop = false;
            countTotal.add(1);
            if(broadcastedStopWordSet.value().contains(v)){
                stopTotal.add(1);
                isStop = true;
            }
            return !isStop;
        })
        //遍历总数计数、停用词计数,过滤停止词, Rdd<String>
        .mapToPair(v-> new Tuple2<>(v,1))
        .reduceByKey((v1,v2)->v1+v2)
        //统计个数
        .mapToPair(p-> new Tuple2<>(p._2,p._1))
        .sortByKey(false)
        //排序
        .take(10).forEach(e->{
            System.out.println(e._2+":"+e._1);
        });
  1. 将line分割展成词向量,词向量连接,flatmap返回Rdd<String>
  2. 特殊字符处理,返回 Rdd<String>
  3. 遍历总数计数、停用词计数,过滤停止词, 返回Rdd<String>
  4. Reduce Rdd<String,1>,返回Rdd<String,total>
  5. 排序 SortByKey,返回 Rdd<String,total>

后期有更多案例介绍Java 8 lambda风格的RDD开发

上一篇:Java并发面试,幸亏有点道行,不然又被忽悠了


下一篇:基于Yarn的Spark环境,统计哈姆雷特词频(1)