生成表头
from pyspark.sql.types import *
from pyspark.sql import Row
schemaString="name course score"
fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")]
bt=StructType(fields)
bt
生成数据
xssj=spark.sparkContext.textFile("file:///usr/local/spark/mycode/rdd/xs.txt").map(lambda line:line.split(',')).map(lambda x:Row(name=x[0],course=x[1],score=int(x[2])))
xssj.take(3)
拼接
xsb=spark.createDataFrame(xssj,bt)
xsb.show()
用DataFrame的操作或SQL语句完成以下数据分析要求,并和用RDD操作的实现进行对比:
- 每个分数+5分。
- xsb.select('name','course',xsb.score+5).show()
- 总共有多少学生?
- xsb.select('name').distinct().count()
- 总共开设了哪些课程?
- xsb.select('course').distinct().show()
- 每个学生选修了多少门课?
- xsb.groupBy('name').count().show()
-
- 每门课程有多少个学生选?
- xsb.groupBy('course').count().show()
- 每门课程大于95分的学生人数?
- xsb.filter(xsb.score>95).groupBy('course').count().show()
- Tom选修了几门课?每门课多少分?
- xsb.filter(xsb.name=='Tom').show()
- Tom的成绩按分数大小排序。
- xsb.filter(xsb.name=='Tom').orderBy(xsb.score).show()
- Tom的平均分。
- xsb.filter(xsb.name=='Tom').agg({'score':'mean'}).show()
- 求每门课的平均分,最高分,最低分。
-
xsb.filter(xsb.name=='Tom').groupBy('course').agg({'score':'mean'}).show()
- xsb.filter(xsb.name=='Tom').groupBy('course').agg({'score':'max'}).show()
- xsb.filter(xsb.name=='Tom').groupBy('course').agg({'score':'min'}).show()