目录
一、实验内容
1.pyspark交互式编程
本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了DataBase这门课。
2.编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
······
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 y
······
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z
······
3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
二、需求描述
本次实验需要用到Ubuntu虚拟机,并在虚拟机配置spark环境,从而实现RDD编程。本次实验主要是对Spark的RDD基本操作及键值对操作,分别是pyspark交互式编程、编写独立应用程序实现数据去重以及编写独立应用程序实现求平均值问题三个内容。在pyspark交互式编程中,对data.txt文件实验数据进行计算,主要需要用到map()函数对数据进行数据拆分、reduce()函数对数据进行数据统计以及filter()函数对数据进行过滤:在编写独立应用程序实现数据去重中,主要需要用到union()函数将两个文件进行合并、distinct()函数剔除其中重复的内容以及sortBy()函数对文件内容进行排序;在编写独立应用程序实现求平均值问题,主要需要用到reduceByKey()函数对数据进行分组统计、repartition()函数对数据重新分区、saveAsTextFile()函数将数据集的元素以textfile的形式保存到文件中。
三、实验平台和环境搭建
1.实验平台
操作系统:Ubuntu16.04
Spark版本:2.4.0
Python版本:3.4.3
2.环境搭建
1.下载spark包。把文件spark-1.6.0-bin-without-hadoop-1.tgz下载到本地电脑,假设保存在“/home/hadoop/Downloads/”目录下。在下载目录中查看下载结果,结果如图所示:
2.安装spark。将下载的spark包解压到/usr/local目录下,然后进入/usr/local目录,将spark包移动到下一级目录/spark,然后再对spark包进行安装,命令及结果如下所示:
$ sudo tar -zxf ~/下载/spark-1.6.0-bin-without-hadoop-1.tgz -C /usr/local // 解压
$ cd /usr/local
$ sudo mv ./spark-1.6.0-bin-without-hadoop/ ./spark //移动
$ sudo chown -R hadoop:hadoop ./spark //安装
3.进入/usr/local/spark目录,使用vim编辑器(查看vim编辑器使用方法)打开了hadoop这个用户的环境变量配置文件加以编辑,编辑在首行加入内容如表格所示,编辑结果如下所示。最后保存.bashrc文件并退出vim编辑器。命令及结果如下所示
$ cd /usr/local/spark
$ sudo vim ~/.bashrc //编辑环境变量配置
环境:
export HIVE_HOME= /usr/local/hive
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:/usr/local/hbase/bin:/usr/local/scala/bin:/usr/local/hadoop/bin:$HIVE_HOME/bin:$SPARK_HOME/bin
export JAVA_HOME = /usr/lib/jvm/default-java
export SPARK_HOME=/usr/local/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.9-src.zip:PYTHONPATH
export PYSPARK_PYTHON = python3
4.执行$ source ~/.bashrc让.bashrc文件的配置立即生效。
$ source ~/.bashrc
5.使用pyspark命令启动spark并进行简单的测试,如图所示
四、数据上传
1、将所需的文件下载到本地电脑,假设保存在“/home/hadoop/Downloads/”目录下,在下载目录中查看下载结果,结果如图所示:
2、使用mv将下载目录下的文件全部移动到/usr/local/spark目录下,命令及结果如下图所示:
$ sudo mv ./home/hadoop/下载/* ./usr/local/spark
3、在/usr/local/spark查看数据上传结果,结果如图所示:
$ vim A
$ vim B
$ vim Algorithm
$ vim Database
$ vim Python
五、实验步骤及过程
1.pyspark交互式编程。
根据文件data.txt实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生。
先将文件data.txt的内容读取出来并赋予对象lines,然后对lines用map()进行数据拆分赋予对象res,再对res用distinct()进行数据去重操作赋予sum,最后对sum进行统计,统计结果如图5.1.1所示,命令如下所示。最后如结果图5…1.1可得到结果:该系总共有256个学生。
>>> lines = sc.textFile(“file///home/hadoop/下载/data.txt //获取数据
>>> res=lines.map(lambda x:x.split(",")).map(lambda x:x[0])//数据拆分
>>> sum = res.distinct() //数据去重
>>> sum.count() //数据统计
(2)该系共开设了多少门课程;
对lines按条件用map()进行数据拆分赋予res1,然后对res1用distinct()进行去重操作并赋予对象dis_res,最后对对象dis_res进行统计,统计结果如图所示,命令如下所示。根据图可得:总共开设了8门课程。
>>> res1 = lines.map(lambda x:x.split(",")).map(lambda x:x[1])
>>> dis_res = res1.distinct()
>>> dis_res.count()
(3)Tom同学的总成绩平均分是多少;
对lines按条件用map()进行数据拆分赋予res2,查看拆分后的结果如图所示;对res2进行统计并赋予对象num2,再对res2按要求进行数据拆分赋予对象score,再对score用reduce()函数进行数据统计并赋予对象sum_score,最后求得Tom的总成绩平均分svg如图可知平均分svg为30.8分,命令如下所示。
>>> res2 = lines.map(lambda x:x.split(',')).map(lambda x:x[0]=='Tom')
>>> res2.foreach(print)
>>> num2 = res2.count()
>>> score = res2.map(lambda x:int(x[2]))
>>> sum_score = score.reduce(lambda x,y:x+y)
>>> svg = sum_score/num2
>>> print(avg)
(4)求每名同学的选修的课程门数;
对lines按条件用map()进行数据拆分赋予res3,再对res3用reduceByKey()函数进行数据统计赋予each_res,最后打印each_res得到每名同学的选修的课程门数如图,命令如下所示:
>>> res3 = lines.map(lambda x:x.split(',')).map(lambda x:(x[0],1))
>>> each_res = res3.reduceByKey(lambda x,y:x+y)
>>> each_res.foreach(print)
(5)该系DataBase课程共有多少人选修;
对lines按条件用map()进行数据拆分赋予res4,最后对res4进行统计得到结果如图所示,命令如下所示,结果可知该系DataBase课程共有1764人选修.
>>> res4 = lines.map(lambda x:x.split(',')).filter(lambda x:x[1]=='DataBase')
>>> res.count()
(6)各门课程的平均分是多少;
对lines按条件用map()进行数据拆分赋予res5,对res5用reduceByKey()函数进行数据统计赋予对象temp,然后对temp用map()函数进行数据拆分球平均分,得到结果avg1并打印可得各门课程的平均分如图所示,命令如下所示:
>>> res5 = lines.map(lambda x:x.split(',')).map(lambda x:(x[1],(int([2]),1)))
>>> temp = res5.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
>>> avg1 = temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
>>> avg1.foreach(print)
(7)使用累加器计算共有多少人选了DataBase这门课。
对lines按条件用map()进行数据拆分赋予res6,再对res6用accumulator()进行数据累加,最后得到accum的值如图所示,命令如下所示,最后可得结果共有1764人选了DataBase这门课
>>> res6 = lines.map(lambda x:x.split(',')).filter(lambda x:x[1]=='DataBase')
>>> accum = sc.accumulator(0)
>>> res6.foreach(lambda x:accum.add(1))
>>>> accum.value
2.编写独立应用程序实现数据去重。
根据文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C:
(1)进入文件A,B所在路径,编辑一个C.py文件,编辑内容如表格所示,编辑命令如图所示,编辑结果如图文件内容如图所示,后一个Python脚本调用该模块:
$ cd /usr/local/spark
$ vim A
$ vim B
$ vim C.py
$ python3 C.py
编辑内容:
from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext("local","spark")
#加载文件A、B,创建RDD
lines1 = sc.textFile("file:///usr/local/spark/A")
lines2 = sc.textFile("file:///usr/local/spark/B")
#合并文件
lines = lines1.union(lines2)
#去重复
distinct_lines = lines.distinct()
#排序
res = distinct_lines.sortBy(lambda x:x)
#让合并结果放入一个文件中
res.repartition(1).saveAsTextFile ("file:///usr/local/spark/result")
(2)查看运行完的文件是否生成成功,并查看最后的文件内容是否正确,结果如图所示:
3.编写独立应用程序实现求平均值问题.
文件Algorithm,Database,Python分别表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中:
(1)进入/usr/local/spark目录,分别编辑Algorith),Python,Database文件,文件内容如图所示,再编辑一个脚本文件avg.pya文件,编辑内容如下所示:
$ vim Algorithm
$ vim Python
$ vim Database
$ vim avg.py
$ python3 avg.py
Algorithm:
小明a 92
小红a 87
小新a 75
小丽a 90
小明b 92
······
Database:
小明a 92
小红a 87
小新a 75
小丽a 84
小明b 92
······
Python:
小明w 85
小红w 87
小新w 91
小丽w 90
小明v 84
······
avg.py:
from pyspark import SparkContext
sc = SparkContext("local","spark")
#加载文件,生成RDD
lines1 = sc.textFile("file:///usr/local/spark/Algorithm")
lines2 = sc.textFile("file:///usr/local/spark/Database")
lines3 = sc.textFile("file:///usr/local/spark/Python")
#合并三个文件
lines = lines1.union(lines2).union(lines3)
#做拆分
data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
#做reduceByKey,分组统计
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
result.repartition(1).saveAsTextFile("file:///usr/local/spark/result1")
(2)查看运行完的文件是否生成成功,并查看最后的文件内容是否正确。平均值如图所示: