Spark4-RDD使用

如何创建RDD

1.创建方式

1.parallelizing an existing collection in your driver program

通过并行化存在的一个集合,将集合转换成RDD

2.referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

使用外部的存储系统,外部文件系统包括hdfs hbase

2.RDD创建

2.1集合(数组)方式转换

scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> distData.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)

在webUI界面中我们可以看到task的执行数量,task的数量可以在spark/conf目录下spark-default.conf文件设置

spark.default.parallelism 24

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

val distData = sc.parallelize(data,5)

在并行化操作时,会将数据集拆分成partition的数量,例如指定5,webUI界面可以看到task的数量也是5,所以在spark中,partition的数量 = task的数量.

2.2外部数据集

使用sc.textFile(“path”)指定本地文件数据源

scala> val testData = sc.textFile("file:///opt/scripts/check_yarn.txt")
testData: org.apache.spark.rdd.RDD[String] = file:///opt/scripts/check_yarn.txt MapPartitionsRDD[10] at textFile at <console>:24

scala> testData.collect
res6: Array[String] = Array(lyhU51BaseWebBankStg, lyhClientContactStg, lyhNfcsStg, lyhU51AdditionAppStg, lyhApplicationStg, lyhU51BaseOperatorStg, lyhU51AdditionInfoStg, lyhAppCrossStg, lyhMoxieCarrierStg)

使用sc.textFile(“path”)指定hdfs数据源

scala> val testData = sc.textFile("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt")
testData: org.apache.spark.rdd.RDD[String] = hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt MapPartitionsRDD[18] at textFile at <console>:24

scala> testData.collect
res11: Array[String] = Array(lyhU51BaseWebBankStg, lyhClientContactStg, lyhNfcsStg, lyhU51AdditionAppStg, lyhApplicationStg, lyhU51BaseOperatorStg, lyhU51AdditionInfoStg, lyhAppCrossStg, lyhMoxieCarrierStg)

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

如果是指定文件夹的话,在写路径的时候可以只写到文件夹目录,同时也可以指定特定文件

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

textFile minPartition:在textFile()的文件路径后面可以使用另外一个可选参数,可以控制partition的数量,默认一个block创建一个partition,也可以通过设置一个更大的值来设置partition的大小和数量.

wholeTextFiles

scala> val testData = sc.wholeTextFiles("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs")
testData: org.apache.spark.rdd.RDD[(String, String)] = hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs MapPartitionsRDD[20] at wholeTextFiles at <console>:24

scala> testData.collect
res12: Array[(String, String)] =
Array((hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt,"lyhU51BaseWebBankStg
lyhClientContactStg
lyhNfcsStg
lyhU51AdditionAppStg
lyhApplicationStg
lyhU51BaseOperatorStg
lyhU51AdditionInfoStg
lyhAppCrossStg
lyhMoxieCarrierStg
"))

跟上面的textFile()不同的是,使用wholeTextFiles()既可以返回一个键值对(filename,content),可以看到文件的目录名称以及内容.

2.3RDD文件保存

scala> testData.saveAsTextFile("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/testData")
#然后在hdfs对应的目录下能看见我们保存的文件
# hadoop fs -ls /user/hdfs/testData
Found 3 items
-rw-r--r--   2 root supergroup          0 2019-05-06 17:23 /user/hdfs/testData/_SUCCESS
-rw-r--r--   2 root supergroup         91 2019-05-06 17:23 /user/hdfs/testData/part-00000
-rw-r--r--   2 root supergroup         78 2019-05-06 17:23 /user/hdfs/testData/part-00001

# hadoop fs -text /user/hdfs/testData/*
lyhU51BaseWebBankStg
lyhClientContactStg
lyhNfcsStg
lyhU51AdditionAppStg
lyhApplicationStg
lyhU51BaseOperatorStg
lyhU51AdditionInfoStg
lyhAppCrossStg
lyhMoxieCarrierStg

可以看到我们的结果是保存进去了,但是最后生成了两个结果文件,一般而言,saveAsTextFile会按照执行task的多少生成多少个文件,比如part-00000一直到part-0000n,n自然就是task的个数,亦即是最后的stage的分区数。在RDD上调用 coalesce(1,true).saveAsTextFile()又或者,可以调用repartition(1)这两个方法来使我们最后的结果输出到一个文件中.

tips:如果机器的内存不够大,但是数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足。

上一篇:选择排序


下一篇:技术周刊2020-07-06