TopN:求每个人的所有成绩中最好的三个成绩

一、数据源

xiaoliu 64
xiaoliu 69
xiaoliu 79
xiaoji 98
xiaoliu 100
xiaoji 99
xiaowang 27
xiaowang 69
xiaowang 64
xiaozhang 67
xiaozhang 38
xiaozhang 93
xiaozhang 29
xiaozhang 85
xiaoliu 19
xiaoliu 53
xiaoliu 93
xiaoji 90
xiaoji 85
xiaoji 73
xiaoji 64
xiaoji 39

二、编程pyspark

2.1 方法一:

from pyspark import SparkContext,SparkConf
import  os
import random

#添加spark的环境变量
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = "E:\ProgramFiles\spark-2.2.1-bin-2.6.0-cdh5.14.2"
    os.environ['PYSPARK_PYTHON'] = "E:\ProgramFiles\Anaconda3\python.exe"

#1.构建上下文
config = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("TopNDemo")

sc = SparkContext.getOrCreate(config)

#2.读取数据形成RDD
path = "G:/liu/topn.txt"
rdd = sc.textFile(path)

#2.1第一步,对原始的数据进行格式转换 String => (String,Int)
rdd1  = rdd \
    .map(lambda  line : line.split(" ")) \
    .filter(lambda arr : len(arr) == 2) \
    .map(lambda arr : (arr[0],int(arr[1])))


#2.2第二步开始进行top程序实现 ==> top3
#2.2.1 方式一实现:因为我们实现的是分组排序,当相同的key过多,那么容易导致数据倾斜,那么如何解决数据倾斜的问题??
##解决数据倾斜的问题的,根本解决方法,就是将key进行打散。
##(key + 随机前缀,然后进行分组,接着先计算出每个组中的前n名,然后去掉随机前缀,再分组,再把所有的数据排序,取出前N名即可)
def top3(key,iter):
    sortedIter = sorted(iter,reverse=True)
    #使用python的切片实现,前闭后开
    top3 = sortedIter[0:3]
    return map(lambda x : (key,x),top3)

result1 = rdd1 \
    .map(lambda t : ((random.randint(1,10),t[0]),t[1])) \
    .groupByKey() \
    .flatMap(lambda t : top3(t[0][1],t[1])) \
    .groupByKey() \
    .flatMap(lambda t : top3(t[0],t[1]))

print(result1.collect())
#[('xiaoming', 97), ('xiaoming', 80), ('xiaoming', 78), ('xiaoli', 98), ('xiaoli', 97), ('xiaoli', 92), ('xiaoai', 18), ('xiaohong', 98), ('xiaohong', 87), ('xiaohong', 86), ('xiaozhi', 80)]

结果:

[('xiaoliu', 100), ('xiaoliu', 93), ('xiaoliu', 79), ('xiaowang', 69), ('xiaowang', 64), ('xiaowang', 27), ('xiaozhang', 93), ('xiaozhang', 85), ('xiaozhang', 67), ('xiaoji', 99), ('xiaoji', 98), ('xiaoji', 90)]

TopN:求每个人的所有成绩中最好的三个成绩

2.2 方法二(aggregateByKey)

2.2.1 方法一

#写法一:
from pyspark import SparkContext,SparkConf
import  os
from functools import reduce

#添加spark的环境变量
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = "E:\ProgramFiles\spark-2.2.1-bin-2.6.0-cdh5.14.2"
    os.environ['PYSPARK_PYTHON'] = "E:\ProgramFiles\Anaconda3\python.exe"

#1.构建上下文
config = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("TopNDemo")

sc = SparkContext.getOrCreate(config)

#2.读取数据形成RDD
path = "G:/liu/topn.txt"
rdd = sc.textFile(path)

rdd1  = rdd \
    .map(lambda  line : line.split(" ")) \
    .filter(lambda arr : len(arr) == 2) \
    .map(lambda arr : (arr[0],int(arr[1])))
zeroValue = []
def f(a,b):
    a.append(b)
    sortedIter = sorted(a,reverse=True)
    top3 = sortedIter[0:3]
    return top3
seqFunc = lambda a ,b : f(a,b)
def g(c,d):
    for i in d:
        c.append(i)
    sortedIter = sorted(c,reverse=True)
    top3 = sortedIter[0:3]
    return top3
combFunc = lambda c , d: g(c,d)

result2 = rdd1 \
    .aggregateByKey(zeroValue,seqFunc,combFunc)
print(result2.collect())

2.2.2 方法二

#写法2:
from pyspark import SparkContext,SparkConf
import  os
from functools import reduce

#添加spark的环境变量
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = "E:\ProgramFiles\spark-2.2.1-bin-2.6.0-cdh5.14.2"
    os.environ['PYSPARK_PYTHON'] = "E:\ProgramFiles\Anaconda3\python.exe"

#1.构建上下文
config = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("TopNDemo")

sc = SparkContext.getOrCreate(config)

#2.读取数据形成RDD
path = "G:/liu/topn.txt"
rdd = sc.textFile(path)

rdd1  = rdd \
    .map(lambda  line : line.split(" ")) \
    .filter(lambda arr : len(arr) == 2) \
    .map(lambda arr : (arr[0],int(arr[1])))
zeroValue = []
def f(a,b):
    a.append(b)
    sortedIter = sorted(a,reverse=True)
    top3 = sortedIter[0:3]
    return top3
seqFunc = lambda a ,b : f(a,b)
combFunc = lambda c,d : reduce(lambda x , y : f(x,y),c,d)
result3 = rdd1 \
    .aggregateByKey(zeroValue,seqFunc,combFunc)
print(result3.collect())

TopN:求每个人的所有成绩中最好的三个成绩

上一篇:Exercise 14 - prompting, passing


下一篇:展示hive表大小的topN