package com.swust.seltop; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.*; /** * * @author 雪瞳 * @Slogan 时钟尚且前行,人怎能再此止步! * @Function 分组取TopN * */ public class SortTopN { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("top"); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.setLogLevel("Error"); String inputPath = "./data/top.txt"; JavaRDD<String> input = jsc.textFile(inputPath,1); //top10类 JavaPairRDD<String, Integer> pairRDD = input.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String line) throws Exception { // 14 cat1 cat1 String[] splits = line.split(" "); Tuple2<String, Integer> tp = new Tuple2<>(splits[0]+"\t"+splits[1]+"\t"+splits[2], Integer.parseInt(splits[0])); return tp; } }); //为每一个分区创建一个本地 top10列表 JavaRDD<SortedMap<Integer, String>> singleTop10 = pairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() { @Override public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iterator) throws Exception { SortedMap<Integer, String> top = new TreeMap<>(); while (iterator.hasNext()) { Tuple2<String, Integer> next = iterator.next(); top.put(next._2, next._1); //保留正序前10 if (top.size() > 10) { top.remove(top.firstKey()); } } List<SortedMap<Integer, String>> list = Collections.singletonList(top); return list.iterator(); } }); //收集所有本地的top10 列表 List<SortedMap<Integer, String>> singleResult = singleTop10.collect(); SortedMap<Integer,String> finalResult = new TreeMap<>(); for (SortedMap<Integer, String> elements : singleResult){ //遍历map并将数据存储到finalResult内 Set<Map.Entry<Integer, String>> entries = elements.entrySet(); for (Map.Entry<Integer,String> entry:entries){ finalResult.put(entry.getKey(),entry.getValue()); } if (finalResult.size()>10){ finalResult.remove(finalResult.firstKey()); } } //输出结果 for (Map.Entry<Integer,String> entry : finalResult.entrySet()){ System.err.println(entry.getKey()+"------"+entry.getValue()); } // 替代方案 使用reduce进行数据迭代 /*singleTop10.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() { @Override public SortedMap<Integer, String> call(SortedMap<Integer, String> sm1, SortedMap<Integer, String> sm2) throws Exception { SortedMap<Integer,String> top10 = new TreeMap<>(); for (Map.Entry<Integer,String> entry : sm1.entrySet()){ top10.put(entry.getKey(),entry.getValue()); if (top10.size()>10){ top10.remove(top10.firstKey()); } } for (Map.Entry<Integer,String> entry : sm2.entrySet()){ top10.put(entry.getKey(),entry.getValue()); if (top10.size()>10){ top10.remove(top10.firstKey()); } } return top10; } });*/ } }