去年网上曾放出个2000W的开房记录的数据库, 不知真假。 最近在学习Spark, 所以特意从网上找来数据测试一下, 这是一个绝佳的大数据素材。 如果数据涉及到个人隐私,请尽快删除, 本站不提供此类数据。你可以写个随机程序生成2000W的测试数据, 以CSV格式。
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更 好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
Spark是一个高效的分布式计算系统,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上层的API, 同样的算法在Spark中实现往往只有Hadoop的1/10或者1/100的长度。Shark类似“SQL on Spark”,是一个在Spark上数据仓库的实现,在兼容Hive的情况下,性能最高可以达到Hive的一百倍。
Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
2014年处, Apache 基金会宣布旗下的 Apache Spark 项目成为基金会的*项目,拥有*域名 http://spark.apache.org/。 Spark 的用户包括:阿里巴巴、Cloudera、Databricks、IBM、英特尔和雅虎等知名厂商。
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把 行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件, 最重要的是它可以支持用HiveQL从hive里面读取数据。
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容 忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设 计的部分无疑成为了整个项目的瓶颈。 详细内容请参看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark
当前Spark SQL还处于alpha阶段,一些API在将将来的版本中可能会有所改变。
我也翻译几篇重要的Spark文档,你可以在我的网站找到。 Spark翻译文档
本文主要介绍了下面几个知识点:
-
Spark读取文件夹的文件
-
Spark filter和map使用
-
Spark sql语句调用
-
自定义Spark sql的函数
提前讲一下,我也是最近才学习Spark及其相关的技术如Scala,下面的例子纯粹为了验证性的试验, 相信例子代码很很多优化的地方。
安装和配置Spark
当前最新的Spark版本为1.1.1, 因为我们以Standalone方式运行Spark,所以直接随便挑一个版本, 比如spark-1.1.1-bin- hadoop2.4.tgz, 解压到你的机器上。 我使用的CentOS 6.4。 具体来讲,它是我笔记本的一个虚拟机, 4个核, 4G内存。
在/opt解压它, 命令行中进入解压后的目录/opt/spark-1.1.1-bin-hadoop2.4。
运行 ./bin/spark-shell
就可以启动一个交互式的spark shell控制台, 在其中可以执行scala代码。
Spark初试
因为我们以本地单机的形式测试Spark, 你需要配置以下你的spark, 否则在分析大数据时很容易出现内存不够的问题。 在我的机器上, conf文件夹下复制一份spark-defaults.conf,将使用的内存增大一些:
1
2
|
spark.executor.memory 2g
spark.driver.memory 2g
---|---
启动shark-shell的时候设置使用4个核。
1
|
[root
@colobu
conf]# ./bin/spark-shell --master local[
4
]
---|---
根据 Spark 快速入门 中的介绍运行个例子测试一下:
1
2
3
4
5
|
scala> val textFile = sc.textFile(
"README.md"
)
14
/
12
/
11
13
:
52
:
00
INFO MemoryStore: ensureFreeSpace(
163705
) called with curMem=
0
, maxMem=
1111794647
14
/
12
/
11
13
:
52
:
00
INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size
159.9
KB, free
1060.1
MB)
textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[
1
] at textFile at <console>:
12
scala> textFile.count()
---|---
这个例子从Spark解压目录下的README.md文件创建一个RDD,并统计此文件有多少行。
再看一个抛针法计算PI值的例子。
1
2
3
4
5
6
7
|
val NUM_SAMPLES=
1000000
val count = sc.parallelize(
1
to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if
(x*x + y*y <
1
)
1
else
0
}.reduce(_ + _)
println(
"Pi 值大约为 "
\+
4.0
* count / NUM_SAMPLES)
---|---
结果为:
1
|
Pi 值大约为
3.141408
---|---
到目前为止,我们搭建好了一个Spark环境, 并简单进行了测试。 下一步我们使用Spark SQL分析前面所说的数据。
使用Spark SQL分析数据
这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。 当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。 实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。 反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。
先看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
val sqlContext =
new
org.apache.spark.sql.SQLContext(sc)
import
sqlContext.createSchemaRDD
case
class
Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)
val customer = sc.textFile(
"/mnt/share/2000W/*.csv"
).map(_.split(
","
)).filter(line => line.length >
7
).map(p => Customer(p(
0
), p(
5
), p(
4
), p(
6
), p(
7
))).distinct()
customer.registerTempTable(
"customer"
)
def toInt(s: String):Int = {
try
{
s.toInt
}
catch
{
case
e:Exception =>
9999
}
}
def myfun(birthday: String) : String = {
var rt =
"未知"
if
(birthday.length ==
8
) {
val md = toInt(birthday.substring(
4
))
if
(md >=
120
& md <=
219
)
rt =
"水瓶座"
else
if
(md >=
220
& md <=
320
)
rt =
"双鱼座"
else
if
(md >=
321
& md <=
420
)
rt =
"白羊座"
else
if
(md >=
421
& md <=
521
)
rt =
"金牛座"
else
if
(md >=
522
& md <=
621
)
rt =
"双子座"
else
if
(md >=
622
& md <=
722
)
rt =
"巨蟹座"
else
if
(md >=
723
& md <=
823
)
rt =
"狮子座"
else
if
(md >=
824
& md <=
923
)
rt =
"处女座"
else
if
(md >=
924
& md <=
1023
)
rt =
"天秤座"
else
if
(md >=
1024
& md <=
1122
)
rt =
"天蝎座"
else
if
(md >=
1123
& md <=
1222
)
rt =
"射手座"
else
if
((md >=
1223
& md <=
1231
) | (md >=
101
& md <=
119
))
rt =
"摩蝎座"
else
rt =
"未知"
}
rt
}
sqlContext.registerFunction(
"constellation"
, (x:String) => myfun(x))
var result = sqlContext.sql(
"SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)"
)
result.collect().foreach(println)
---|---
为了使用spark sql,你需要引入 sqlContext.createSchemaRDD
. Spark sql一个核心对象就是 SchemaRDD
。 上面的 import
可以隐式的将一个RDD转换成SchemaRDD。 接着定义了 Customer
类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。 接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。 因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数 myfun
, 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。
然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。 最后将结果打印出来。
1
2
3
4
5
6
7
8
9
10
11
12
|
[双子座,
1406018
]
[双鱼座,
1509839
]
[摩蝎座,
2404812
]
[金牛座,
1406225
]
[水瓶座,
1635358
]
[巨蟹座,
1498077
]
[处女座,
1666009
]
[天秤座,
1896544
]
[白羊座,
1409838
]
[射手座,
1614915
]
[未知,
160483
]
[狮子座,
1613529
]
---|---
看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。
我们也可以分析一下开房的男女比例:
1
2
3
|
......
result = sqlContext.sql(
"SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender"
)
result.collect().foreach(println)
---|---
结果显示男女开房的人数大约是2:1
1
2
|
[F,
6475461
]
[M,
12763926
]
---|---