数据算法——Spark的TopN实现

1.scala实现:

/**
  * TOPN:维持一个定长数组,先塞满,然后把剩下的逐个对应数组中的元素,
  * 有大的就把小的弹出去,大的加进来,并移动位置来重新排序
  */
object TopN {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local").appName("topn").getOrCreate()
    val sc = session.sparkContext
    val lines = sc.textFile("data/scores")
    val pairInfo = lines.map(one=>{(one.split(" ")(0),one.split(" ")(1).toInt)})

    pairInfo.groupByKey().map(tp=>{
      val className = tp._1
      val iterator = tp._2.iterator
      val top3 = new Array[Int](3)//定长
      val loop = new Breaks//相当于java中的break
      while (iterator.hasNext){
        val currentOne = iterator.next()
        loop.breakable{ //里面break,就回到这
          //整个过程都将围绕着这个长度为3的定长数组展开
          for (i<-0 until top3.size){
            //先把定长数组塞满
            if (top3(i)==0){
              top3(i)=currentOne
              loop.break()
            }else if(currentOne>top3(i)){//开始拿剩下的元素跟数组中的比较0、1、2
              for (j<-2 until(i,-1)){
                top3(j)=top3(j-1)//开始挪地方了,比如:i=0,表示新值要插入在0号位,把1号位的值挪到2号,0号位的旧值挪到1号位
              }
              top3(i)=currentOne//把这个新成员插入进来
              loop.break()//插入一次,跳出重新再来下一轮
            }
          }
        }
      }
      (className,top3.toBuffer)
    }).collect()
      .foreach(println)
  }
}

2.Java实现:

public class TopN {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local");
        sparkConf.setAppName("topn");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = sc.textFile("data/scores");
        JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s.split(" ")[0], Integer.valueOf(s.split(" ")[1]));
            }
        });
        pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tp) throws Exception {
                String className = tp._1;
                Iterator<Integer> iter = tp._2.iterator();
                Integer[] top3 = new Integer[3];
                while (iter.hasNext()){
                    Integer currentOne = iter.next();
                    for (int i = 0; i < 3; i++) {
                        if (top3[i]==null){
                            top3[i]=currentOne;
                            break;
                        }else if(currentOne>top3[i]){
                            for (int j = 2; j >i; j--) {
                                top3[j]=top3[j-1];
                            }
                            top3[i]=currentOne;
                            break;
                        }
                    }
                }
                for (Integer i:top3){
                    System.out.println("className="+className+";value="+i);
                }
            }
        });
    }
}
上一篇:Mapreduce最定义groupComparator实现分组求取topN和其他的参数以及调优


下一篇:集合