Spark开发wordcount程序

1、java版本(spark-2.1.0)

package chavin.king;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import java.util.Arrays;

import java.util.Iterator;

import org.apache.spark.SparkConf;

public class WordCount {

public static void main(String[] args) {
         // TODO Auto-generated method stub

//初始化spark应用
         SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
        
         //读取文件
         JavaRDD<String> lines = sc.textFile("E://test//spark_wc.txt");

//将每一行切割成单词
         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

public Iterator<String> call(String line) throws Exception {
                 return Arrays.asList(line.split(" ")).iterator();
             }

});

//将每个单词映射成(word,1)格式
         JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String word) throws Exception {
                 return new Tuple2<String, Integer>(word, 1);
             }

});

//计算每个单词出现次数
         JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer v1, Integer v2) throws Exception {
                 return v1 + v2;
             }

});

//打印输出
         wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

public void call(Tuple2<String, Integer> wordCount) throws Exception {
                 System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
             }

});

//关闭SparkContext
         sc.close();

}

}

2、scala版本

package chavin.king

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object WordCountLocal {

def main(args: Array[String]) {
    
     val conf = new SparkConf().setAppName("WordCount").setMaster("local")
     val sc = new SparkContext(conf)

val lines = sc.textFile("E://test//spark_wc.txt", 1)
     val words = lines.flatMap { line => line.split(" ") }
     val pairs = words.map { word => (word, 1) }
     val wordCounts = pairs.reduceByKey { _ + _ }

wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
    
   }

}

上一篇:一篇很全面的freemarker教程 前端工程师必备


下一篇:[VC6,VC9] [ts,nts,deb] [rpm,msi] 你需要下载什么格式的文件