1.scala实现:
/**
* TOPN:维持一个定长数组,先塞满,然后把剩下的逐个对应数组中的元素,
* 有大的就把小的弹出去,大的加进来,并移动位置来重新排序
*/
object TopN {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local").appName("topn").getOrCreate()
val sc = session.sparkContext
val lines = sc.textFile("data/scores")
val pairInfo = lines.map(one=>{(one.split(" ")(0),one.split(" ")(1).toInt)})
pairInfo.groupByKey().map(tp=>{
val className = tp._1
val iterator = tp._2.iterator
val top3 = new Array[Int](3)//定长
val loop = new Breaks//相当于java中的break
while (iterator.hasNext){
val currentOne = iterator.next()
loop.breakable{ //里面break,就回到这
//整个过程都将围绕着这个长度为3的定长数组展开
for (i<-0 until top3.size){
//先把定长数组塞满
if (top3(i)==0){
top3(i)=currentOne
loop.break()
}else if(currentOne>top3(i)){//开始拿剩下的元素跟数组中的比较0、1、2
for (j<-2 until(i,-1)){
top3(j)=top3(j-1)//开始挪地方了,比如:i=0,表示新值要插入在0号位,把1号位的值挪到2号,0号位的旧值挪到1号位
}
top3(i)=currentOne//把这个新成员插入进来
loop.break()//插入一次,跳出重新再来下一轮
}
}
}
}
(className,top3.toBuffer)
}).collect()
.foreach(println)
}
}
2.Java实现:
public class TopN {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("topn");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile("data/scores");
JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s.split(" ")[0], Integer.valueOf(s.split(" ")[1]));
}
});
pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> tp) throws Exception {
String className = tp._1;
Iterator<Integer> iter = tp._2.iterator();
Integer[] top3 = new Integer[3];
while (iter.hasNext()){
Integer currentOne = iter.next();
for (int i = 0; i < 3; i++) {
if (top3[i]==null){
top3[i]=currentOne;
break;
}else if(currentOne>top3[i]){
for (int j = 2; j >i; j--) {
top3[j]=top3[j-1];
}
top3[i]=currentOne;
break;
}
}
}
for (Integer i:top3){
System.out.println("className="+className+";value="+i);
}
}
});
}
}