读学生课程分数文件chapter4-data01.txt,创建DataFrame。
url = "file:///D:/chapter4-data01.txt" rdd = spark.sparkContext.textFile(url).map(lambda line:line.split(‘,‘)) rdd.take(3) from pyspark.sql.types import IntegerType,StringType,StructField,StructType from pyspark.sql import Row #生成“表头” fields = [StructField(‘name‘,StringType(),True),StructField(‘course‘,StringType(),True),StructField(‘score‘,IntegerType(),True)] schema = StructType(fields) # 生成“表中的记录” data = rdd.map(lambda p:Row(p[0],p[1],int(p[2]))) # 把“表头”和“表中的记录”拼接在一起 df_scs = spark.createDataFrame(data,schema) df_scs.printSchema() df_scs.show()
用DataFrame的操作或SQL语句完成以下数据分析要求,并和用RDD操作的实现进行对比:
1.每个分数+5分。
# 1.每个分数+5分。 df_scs.select(‘name‘,‘course‘,df_scs.score+5).show()
2.总共有多少学生?
# 2.总共有多少学生? df_scs.select(df_scs.name).distinct().count() df_scs.select(df_scs[‘name‘]).distinct().count() df_scs.select(‘name‘).distinct().count()
3.总共开设了哪些课程?
# 3.总共开设了哪些课程? df_scs.select(‘course‘).distinct().show()
4.每个学生选修了多少门课?
# 4.每个学生选修了多少门课? df_scs.groupBy(‘name‘).count().show()
5.每门课程有多少个学生选?
# 5.每门课程有多少个学生选? df_scs.groupBy(‘course‘).count().show()
6.每门课程大于95分的学生人数?
# 6.每门课程大于95分的学生人数? df_scs.filter(df_scs.score>95).groupBy(‘course‘).count().show()
7.Tom选修了几门课?每门课多少分?
# 7.Tom选修了几门课?每门课多少分? df_scs.filter(df_scs.name==‘Tom‘).show()
8.Tom的成绩按分数大小排序。
# 8.Tom的成绩按分数大小排序。 df_scs.filter(df_scs.name==‘Tom‘).orderBy(df_scs.score).show()
9.Tom的平均分。
# 9.Tom的平均分。 df_scs.filter(df_scs.name==‘Tom‘).agg({"score":"mean"}).show()
10.求每门课的平均分,最高分,最低分。
# 10.求每门课的平均分,最高分,最低分。 df_scs.groupBy("course").agg({"score": "mean"}).show() df_scs.groupBy("course").agg({"score": "max"}).show() df_scs.groupBy("course").agg({"score": "min"}).show()
11.求每门课的选修人数及平均分,精确到2位小数。
# 11.求每门课的选修人数及平均分,精确到2位小数。 df_scs.select(countDistinct(‘name‘).alias(‘学生人数‘),countDistinct(‘course‘).alias(‘课程数‘),round(mean(‘score‘),2).alias(‘所有课的平均分‘)).show()
12.每门课的不及格人数,通过率
# 12.每门课的不及格人数,通过率 df_scs.filter(df_scs.score<60).groupBy(‘course‘).count().show()
13.结果可视化。
函数:http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.functions