Spark RDD 简介与操作
RDD 简介
RDD 即弹性分布式数据集(Resilient Distributed Databases),它是 Spark 对数据的核心抽象,也就是 Spark 对于数据进行处理的基本单位。
使用 Spark 对数据进行处理首先需要把数据转换为 RDD,然后在 RDD 上对数据进行操作。RDD 有两种算子,分别是转换和行动。
在 Spark 中,对数据处理的操作流程就是:创建 RDD、对 RDD 进行转化操作、然后执行行动操作求值。
通过对 RDD 进行操作,隐藏了 Spark 底层各节点通信、协调、容错细节,使得操纵数据更加的轻松方便。
连接并初始化 Spark
我的环境用的是 Spark 2.4.4 和 Scala 2.11.8 版本可以匹配。
连接 Spark
如果想在外部集成编译环境中编写一个 Spark 应用,你需要在 Spark 上添加一个 Maven 依赖项。在 Maven 的*仓库可作如下修改:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.4.4
另外,如果你想要连接到 HDFS 集群,则需要在hadoop-client
配置文件中为 HDFS 版本添加依赖项。
例如如下修改(一些常见的 HDFS 版本标签在官方的Third Party Distributions 页面可以查看到):
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,你需要将一些 Spark 中的类引入到你的程序中,例如:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
如果 Scala 版本没问题,我们直接运行
spark-shell
即可。
初始化 Spark
开发 Spark 程序第一件事就是创建一个SparkConf
对象,这个对象包含应用的一些信息,然后创建SparkContext
,SparkContext
可以让 Spark 知道如何访问集群:
val conf = new SparkConf().setAppName("test").setMaster("spark://master:7077")
new SparkContext(conf)
// 在每个JVM中,只有一个SparkContext能够被激活。
// 若需要创建新的SparkContext,你必须调用sc.stop()来关闭当前已激活的那个
上面的例子表示创建一个名字为Shiyanlou
的应用,并且连接到指定的 Spark 集群上。
使用 Spark Shell
Spark shell 中我们不需要这样初始化,因为 Spark Shell 相当于一个 Spark 应用,启动时已经用过spark-shell --master spark://master:7077
来指定集群信息,所以 Spark Shell 启动后已经具备了一个 SparkContext 对象sc
,可以在 Spark shell 中输入sc
测试下。
注意了!
要是集群的方式启动,此次 spark-shell Application才会出现网址上( ip:8088 )
如果需要让 Spark Shell 中可以使用某些 jar 模块,可以通过下面的命令来指定:
spark-shell --master spark://master:7077 --jars code.jar
然后可以通过 maven 坐标来引用一个依赖,就像这样:
spark-shell --master spark://master:7077 --packages "org.example:example:0.1"
如果需要查看完整的选项列表,你可以执行命令spark-shell --help
。
在yarn 上启动可以添加参数 : --master yarn-client
RDD 的系列操作
主要包括 spark RDD 数据源、RDD 的基础操作、RDD 简单实例、转化操作以及行动操作。