Spark SQL 初探: 使用大数据分析2000万数据

去年网上曾放出个2000W的开房记录的数据库, 不知真假。 最近在学习Spark, 所以特意从网上找来数据测试一下, 这是一个绝佳的大数据素材。 如果数据涉及到个人隐私,请尽快删除, 本站不提供此类数据。你可以写个随机程序生成2000W的测试数据, 以CSV格式。

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更 好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

Spark是一个高效的分布式计算系统,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上层的API, 同样的算法在Spark中实现往往只有Hadoop的1/10或者1/100的长度。Shark类似“SQL on Spark”,是一个在Spark上数据仓库的实现,在兼容Hive的情况下,性能最高可以达到Hive的一百倍。

Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

2014年处, Apache 基金会宣布旗下的 Apache Spark 项目成为基金会的*项目,拥有*域名 http://spark.apache.org/。 Spark 的用户包括:阿里巴巴、Cloudera、Databricks、IBM、英特尔和雅虎等知名厂商。

Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把 行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件, 最重要的是它可以支持用HiveQL从hive里面读取数据。

在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容 忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设 计的部分无疑成为了整个项目的瓶颈。 详细内容请参看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark

当前Spark SQL还处于alpha阶段,一些API在将将来的版本中可能会有所改变。

我也翻译几篇重要的Spark文档,你可以在我的网站找到。 Spark翻译文档

本文主要介绍了下面几个知识点:

  • Spark读取文件夹的文件

  • Spark filter和map使用

  • Spark sql语句调用

  • 自定义Spark sql的函数

提前讲一下,我也是最近才学习Spark及其相关的技术如Scala,下面的例子纯粹为了验证性的试验, 相信例子代码很很多优化的地方。

安装和配置Spark

当前最新的Spark版本为1.1.1, 因为我们以Standalone方式运行Spark,所以直接随便挑一个版本, 比如spark-1.1.1-bin- hadoop2.4.tgz, 解压到你的机器上。 我使用的CentOS 6.4。 具体来讲,它是我笔记本的一个虚拟机, 4个核, 4G内存。

在/opt解压它, 命令行中进入解压后的目录/opt/spark-1.1.1-bin-hadoop2.4。

运行 ./bin/spark-shell 就可以启动一个交互式的spark shell控制台, 在其中可以执行scala代码。

回到顶部

Spark初试

因为我们以本地单机的形式测试Spark, 你需要配置以下你的spark, 否则在分析大数据时很容易出现内存不够的问题。 在我的机器上, conf文件夹下复制一份spark-defaults.conf,将使用的内存增大一些:

?

1

2

|

spark.executor.memory 2g

spark.driver.memory 2g

---|---

 

启动shark-shell的时候设置使用4个核。

?

1

|

[root @colobu conf]# ./bin/spark-shell --master local[ 4 ]

---|---

 

根据 Spark 快速入门 中的介绍运行个例子测试一下:

?

1

2

3

4

5

|

scala> val textFile = sc.textFile( "README.md" )

14 / 12 / 11 13 : 52 : 00 INFO MemoryStore: ensureFreeSpace( 163705 ) called with curMem= 0 , maxMem= 1111794647

14 / 12 / 11 13 : 52 : 00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 1060.1 MB)

textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[ 1 ] at textFile at <console>: 12

scala> textFile.count()

---|---

 

这个例子从Spark解压目录下的README.md文件创建一个RDD,并统计此文件有多少行。

再看一个抛针法计算PI值的例子。

?

1

2

3

4

5

6

7

|

val NUM_SAMPLES= 1000000

val count = sc.parallelize( 1 to NUM_SAMPLES).map{i =>

val x = Math.random()

val y = Math.random()

if (x*x + y*y < 1 ) 1 else 0

}.reduce(_ + _)

println( "Pi 值大约为 " \+ 4.0 * count / NUM_SAMPLES)

---|---

 

结果为:

?

1

|

Pi 值大约为 3.141408

---|---

 

到目前为止,我们搭建好了一个Spark环境, 并简单进行了测试。 下一步我们使用Spark SQL分析前面所说的数据。

回到顶部

使用Spark SQL分析数据

这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。 当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。 实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。 反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。

先看代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

|

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.createSchemaRDD

case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)

val customer = sc.textFile( "/mnt/share/2000W/*.csv" ).map(_.split( "," )).filter(line => line.length > 7 ).map(p => Customer(p( 0 ), p( 5 ), p( 4 ), p( 6 ), p( 7 ))).distinct()

customer.registerTempTable( "customer" )

def toInt(s: String):Int = {

try {

s.toInt

} catch {

case e:Exception => 9999

}

}

def myfun(birthday: String) : String = {

var rt = "未知"

if (birthday.length == 8 ) {

val md = toInt(birthday.substring( 4 ))

if (md >= 120 & md <= 219 )

rt = "水瓶座"

else if (md >= 220 & md <= 320 )

rt = "双鱼座"

else if (md >= 321 & md <= 420 )

rt = "白羊座"

else if (md >= 421 & md <= 521 )

rt = "金牛座"

else if (md >= 522 & md <= 621 )

rt = "双子座"

else if (md >= 622 & md <= 722 )

rt = "巨蟹座"

else if (md >= 723 & md <= 823 )

rt = "狮子座"

else if (md >= 824 & md <= 923 )

rt = "处女座"

else if (md >= 924 & md <= 1023 )

rt = "天秤座"

else if (md >= 1024 & md <= 1122 )

rt = "天蝎座"

else if (md >= 1123 & md <= 1222 )

rt = "射手座"

else if ((md >= 1223 & md <= 1231 ) | (md >= 101 & md <= 119 ))

rt = "摩蝎座"

else

rt = "未知"

}

rt

}

sqlContext.registerFunction( "constellation" , (x:String) => myfun(x))

var result = sqlContext.sql( "SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)" )

result.collect().foreach(println)

---|---

 

为了使用spark sql,你需要引入 sqlContext.createSchemaRDD . Spark sql一个核心对象就是 SchemaRDD 。 上面的 import 可以隐式的将一个RDD转换成SchemaRDD。 接着定义了 Customer 类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。 接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。 因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数 myfun , 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。

然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。 最后将结果打印出来。

?

1

2

3

4

5

6

7

8

9

10

11

12

|

[双子座, 1406018 ]

[双鱼座, 1509839 ]

[摩蝎座, 2404812 ]

[金牛座, 1406225 ]

[水瓶座, 1635358 ]

[巨蟹座, 1498077 ]

[处女座, 1666009 ]

[天秤座, 1896544 ]

[白羊座, 1409838 ]

[射手座, 1614915 ]

[未知, 160483 ]

[狮子座, 1613529 ]

---|---

 

看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。

我们也可以分析一下开房的男女比例:

?

1

2

3

|

......

result = sqlContext.sql( "SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender" )

result.collect().foreach(println)

---|---

 

结果显示男女开房的人数大约是2:1

?

1

2

|

[F, 6475461 ]

[M, 12763926 ]

---|---

 

上一篇:vscode 写 markdown 文档


下一篇:Chirpstack笔记 -- GRPC API 的调用