文章目录
- 一、需求分析
- 二、环境介绍
- 三、pyspark交互式编程
- 四、编写独立应用程序实现数据去重
- 五、编写独立应用程序实现求平均值问题
- 六、实验结果查看
一、需求分析
大数据这一术语正是产生在全球数据爆炸增长的背景下,用来形容庞大的数据集合。与传统的数据集合相比,大数据通常包含大量的非结构化数据,且大数据需要更多的实时分析。大数据作为“互联网+”行动计划的主要内容,其重要性得到了广泛重视。
RDD是Spark提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。通俗点来讲,可以将RDD理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个RDD可以分成多个分区,每个分区就是一个数据集片段。一个RDD的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
1、环境安装,安装Spark和Java。
2、pyspark交互式编程。
3、编写独立应用程序实现数据去重。
4、编写独立应用程序实现求平均值问题。
5、实验结果查看。
二、环境介绍
(一)安装Spark。
Spark是一种与Hadoop相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
1.安装Spark。
(1)选择相应的Spark版本等进行安装。
2.登录系统。
(1)进入终端使用下面命令登录系统。
sudo tar -zcf ~/下载/spark-1.6.2-bin-without-hadoop.tgz -C /usr/local
sudo mv ./spark-1.6.2-bin-without-hadoop/ ./spark
sudo chown -R hadoop:Hadoop ./spark
3.修改Spark的相关配置文件。
(1)使用以下命令对Spark配置文件进行修改。
cd /usr/local/spark
cp ./conf/spark-evn.sh.template ./conf/spark-env.sh
4.检验Spark是否成功安装。
(1)使用以下命令检验Spark是否安装成功。
cd /usr/local/spark
Bin/run-example SparkPi
(2)安装成功,会出现以下图片。
(二)在spark shell中运行代码。
1.在四个CPU核心上运行spark-shell。
(1)输入以下代码运行。
cd /usr/local/spark
./bin/spark-shell –master local[4]
(2)启动spark-shell后,就会进入“scala>”命令提示符状态。
(三)Java独立应用编程。
Java是一门面向对象编程语言,不仅吸收了C++语言的各种优点,还摒弃了C++里难以理解的多继承、指针等概念,因此Java语言具有功能强大和简单易用两个特征。Java语言作为静态面向对象编程语言的代表,极好地实现了面向对象理论,允许程序员以优雅的思维方式进行复杂的编程。
1.安装maven。
(1)输入以下代码运行。
sudo unzip ~/下载/apache-maven-3.3.9-bin.zip -d /usr/local
cd /usr/local
sudo mv apache-maven-3.3.9/ ./maven
sudo chown -R Hadoop ./mave
(2)第一行命令成功输入运行后。
(3)Java应用程序代码。
cd ~
mkdir -p ./sparkapp2/src/main/java
(4)在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加相应代码。
(5)使用maven打包java程序。
(6)通过将生成的jar包通过spark-submit提交到Spark中运行,输入以下代码。
/usr/local/spark/bin/spark-submit –class “SimpleApp” ~/sparkapp2/target/simple-project-1.0.jar
三、pyspark交互式编程
(一)数据来源。
由老师提供相应文档data.txt,该数据集包含了某大学计算机系的成绩。
(二)数据上传。
1.将文件data.txt放入相应地方,并放入usr/local/spark/zm路径中。
2.输入命令pyspark启动。
(三)输入相关代码。
(1)该系总共有多少学生。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) #获取每行数据的第1列
>>> distinct_res = res.distinct() #去重操作
>>> distinct_res.count() #取元素总个数
(2)该系共开设了多少门课程。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) #获取每行数据的第2列
>>> distinct_res = res.distinct() #去重操作
>>> distinct_res.count() #取元素总个数
(3)Tom同学的总成绩平均分是多少。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") #筛选Tom同学的成绩信息
>>> res.foreach(print)
>>> score = res.map(lambda x:int(x[2])) #提取Tom同学的每门成绩,并转换为int类型
>>> num = res.count() #Tom同学选课门数
>>> sum_score = score.reduce(lambda x,y:x+y) #Tom同学的总成绩
>>> avg = sum_score/num #总成绩/门数=平均分
>>> print(avg)
(4)求每名同学的选修的课程门数。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) #学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1)
>>> each_res = res.reduceByKey(lambda x,y: x+y) #按学生姓名获取每个学生的选课总数
>>> each_res.foreach(print)
(5)该系DataBase课程共有多少人选修。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
>>> res.count()
(6)各门课程的平均分是多少。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) #为每门课程的分数后面新增一列1,表示1个学生选择了该课程。格式如('Network', (44, 1))
>>> temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #按课程名聚合课程总分和选课人数。格式如('Network', (7370, 142))
>>> avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2))) #课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数
>>> avg.foreach(print)
(7)使用累加器计算共有多少人选了DataBase这门课。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt")
>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase") #筛选出选了DataBase课程的数据
>>> accum = sc.accumulator(0) #定义一个从0开始的累加器accum
>>> res.foreach(lambda x:accum.add(1)) #遍历res,每扫描一条数据,累加器加1
>>> accum.value #输出累加器的最终值
四、编写独立应用程序实现数据去重
(一)数据来源。
由老师提供相应文档A.txt和B.txt,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
(二)输入相关代码。
1.将文件A.txt和B.txt放入相应地方,并放入usr/local/spark/zm路径中。
2.使用命令vim C.py新建并打开Python文件,并输入以下代码。
from pyspark import SparkContext
#初始化SparkContext
sc1 = SparkContext('local','zm')
#加载两个文件A和B
lines1 = sc.textFile("file:///usr/local/spark/zm/A.txt")
lines2 = sc.textFile("file:///usr/local/spark/zm/B.txt")
#合并两个文件的内容
lines = lines1.union(lines2)
#去重操作
distinct_lines = lines.distinct()
#排序操作
res = distinct_lines.sortBy(lambda x:x)
#将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件
res.repartition(1).saveAsTextFile("file:///usr/local/spark/zm/result")
五、编写独立应用程序实现求平均值问题
(一)数据来源。
由老师提供相应文档Algorithm.txt、Database.txt、Python.txt,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
(二)输入相关代码。
1.将文件Algorithm.txt、Database.txt、Python.txt放入相应地方,并放入usr/local/spark/zmzm路径中。
2.使用命令avg.py新建并打开Python文件,并输入以下代码。
from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext('local',' zmzm')
#加载三个文件Algorithm.txt、Database.txt和Python.txt
lines1 = sc.textFile("file:///usr/local/spark/zmzm/Algorithm.txt")
lines2 = sc.textFile("file:///usr/local/spark/zmzm/Database.txt")
lines3 = sc.textFile("file:///usr/local/spark/zmzm/Python.txt")
#合并三个文件的内容
lines = lines1.union(lines2).union(lines3)
#为每行数据新增一列1,方便后续统计每个学生选修的课程数目。
data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
#根据key也就是学生姓名合计每门课程的成绩,以及选修的课程数目。
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
#利用总成绩除以选修的课程数来计算每个学生的每门课程的平均分,并利用round(x,2)保留两位小数
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
#将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到三个文件
result.repartition(1).saveAsTextFile("file:///usr/local/spark/zmzm/result")