学生课程分数的 Spark SQL 分析
读学生课程分数文件 chapter4-data01.txt,创建 DataFrame
from pyspark.sql.types import *
from pyspark.sql import Row
# 下面生成“表头”
fields = [StructField('name',StringType(),True),StructField('course',StringType(),True),StructField('score',IntegerType(),True)]
schema = StructType(fields)
# 下面生成“表中的记录”
lines = spark.sparkContext.textFile("file:///usr/local/spark/mycode/rdd/chapter4-data01.txt")
parts = lines.map(lambda x: x.split(","))
data = parts.map(lambda p:Row(p[0],p[1],int(p[2])))
# 把“表头”和“表中的记录”拼接在一起
df = spark.createDataFrame(data,schema)
df.printSchema()
df.show()
用 DataFrame 的操作或 SQL 语句完成以下数据分析要求,并和用 RDD 操作的实现进行对比:
df.select('name','course',df.score+5).show()
df.select('name').distinct().count()
df.select('course').distinct().show()
df.groupBy('name').count().show()
df.groupBy('course').count().show()
df.filter(df.score>95).groupBy('course').count().show()
df.filter(df.name == 'Tom').show()
df.filter(df.name == 'Tom').orderBy(df.score).show()
df.filter(df.name == 'Tom').agg({"score":"mean"}).show()
# 求每门课的平均分
df.groupBy('course').avg('score').show()
# 求每门课的最高分
df.groupBy('course').max('score').show()
# 求每门课的最低分
df.groupBy('course').min('score').show()
from pyspark.sql.functions import *
df.select(countDistinct('name').alias('学生人数'),countDistinct('course').alias('课程数'),round(mean('score'),2).alias('所有课的平均分')).show()
df.filter(df.score<60).groupBy('course').count().show()