Spark Java版本wordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

public class wordcount{

    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[*]").setAppName("wc");
        JavaSparkContext sc = new JavaSparkContext(conf);
        URL url = wordcount.class.getResource("/wc.txt");
        JavaRDD<String> lineRDD = sc.textFile(url.getPath());
        JavaRDD<String> flatRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {

                String[] fields = line.split(" ");
                List<String> list = Arrays.asList(fields);
                return list.iterator();
            }
        });

        JavaPairRDD<String, Integer> mapRDD = flatRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> stringIntegerTuple2 = new Tuple2<String, Integer>(s, 1);
                return stringIntegerTuple2;
            }
        });

        JavaPairRDD<String, Integer> resultRDD = mapRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

//        resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
//            @Override
//            public void call(Tuple2<String, Integer> tuple2) throws Exception {
//                System.out.println(tuple2._1 +"  :  " + tuple2._2.toString());
//            }
//        });

        Iterator<Tuple2<String, Integer>> iter = resultRDD.sortByKey(false).collect().iterator();
        while(iter.hasNext())
        {
            Tuple2<String, Integer> wc = iter.next();
            System.out.println(wc._1 + " : " + wc._2.toString());
        }
        sc.stop();
    }
}

太烦了

上一篇:Windows下的api 以及 C语言的一些api


下一篇:Linux如何修改文件/文件夹内所有文件的权限