学生课程分数的Spark SQL分析

读学生课程分数文件chapter4-data01.txt,创建DataFrame。

url = "file:///D:/chapter4-data01.txt"
rdd = spark.sparkContext.textFile(url).map(lambda line:line.split(,))
rdd.take(3)

from pyspark.sql.types import IntegerType,StringType,StructField,StructType
from pyspark.sql import Row

#生成“表头”
fields = [StructField(name,StringType(),True),StructField(course,StringType(),True),StructField(score,IntegerType(),True)]
schema = StructType(fields)

# 生成“表中的记录”
data = rdd.map(lambda p:Row(p[0],p[1],int(p[2])))

# 把“表头”和“表中的记录”拼接在一起
df_scs = spark.createDataFrame(data,schema)
df_scs.printSchema()
df_scs.show()

学生课程分数的Spark SQL分析

 

 

 

用DataFrame的操作或SQL语句完成以下数据分析要求,并和用RDD操作的实现进行对比:

1.每个分数+5分。

# 1.每个分数+5分。
df_scs.select(name,course,df_scs.score+5).show()

学生课程分数的Spark SQL分析

2.总共有多少学生?

# 2.总共有多少学生?
df_scs.select(df_scs.name).distinct().count()

df_scs.select(df_scs[name]).distinct().count()

df_scs.select(name).distinct().count()

学生课程分数的Spark SQL分析

3.总共开设了哪些课程?

# 3.总共开设了哪些课程?
df_scs.select(course).distinct().show()

学生课程分数的Spark SQL分析

4.每个学生选修了多少门课?

# 4.每个学生选修了多少门课?
df_scs.groupBy(name).count().show()

学生课程分数的Spark SQL分析

5.每门课程有多少个学生选?

# 5.每门课程有多少个学生选?
df_scs.groupBy(course).count().show()

学生课程分数的Spark SQL分析

6.每门课程大于95分的学生人数?

# 6.每门课程大于95分的学生人数?
df_scs.filter(df_scs.score>95).groupBy(course).count().show()

学生课程分数的Spark SQL分析

7.Tom选修了几门课?每门课多少分?

# 7.Tom选修了几门课?每门课多少分?
df_scs.filter(df_scs.name==Tom).show()

学生课程分数的Spark SQL分析

8.Tom的成绩按分数大小排序。

# 8.Tom的成绩按分数大小排序。
df_scs.filter(df_scs.name==Tom).orderBy(df_scs.score).show()

学生课程分数的Spark SQL分析

9.Tom的平均分。

# 9.Tom的平均分。
df_scs.filter(df_scs.name==Tom).agg({"score":"mean"}).show()

学生课程分数的Spark SQL分析

10.求每门课的平均分,最高分,最低分。

# 10.求每门课的平均分,最高分,最低分。
df_scs.groupBy("course").agg({"score": "mean"}).show()

df_scs.groupBy("course").agg({"score": "max"}).show()

df_scs.groupBy("course").agg({"score": "min"}).show()

学生课程分数的Spark SQL分析

11.求每门课的选修人数及平均分,精确到2位小数。

# 11.求每门课的选修人数及平均分,精确到2位小数。
df_scs.select(countDistinct(name).alias(学生人数),countDistinct(course).alias(课程数),round(mean(score),2).alias(所有课的平均分)).show()

学生课程分数的Spark SQL分析

12.每门课的不及格人数,通过率

# 12.每门课的不及格人数,通过率
df_scs.filter(df_scs.score<60).groupBy(course).count().show()

学生课程分数的Spark SQL分析

13.结果可视化。

 

 

 

 

  函数:http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.functions 

学生课程分数的Spark SQL分析

上一篇:安装postgresql


下一篇:Jdbc