Spark提高篇——RDD/DataSet/DataFrame(一)

该部分分为两篇,分别介绍RDD与Dataset/DataFrame:

一、RDD

二、DataSet/DataFrame

先来看下官网对RDD、DataSet、DataFrame的解释:

1.RDD

Resilient distributed dataset(RDD),which is a fault-tolerant collection of elements that can be operated on in parallel

RDD——弹性分布式数据集,分布在集群的各个结点上具有容错性的元素集,可以被并行处理。

参考链接:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

2. DataSet & DataFrame

A Dataset is a distributed collection of data.

DataSet——分布式数据集。

A DataFrame is a Dataset organized into named columns.

DataFrame——按列命名的分布式数据集。
 
API文档中可以看到,DataFrame其实就是指定了元素类型为Row的DataSet。而Row类型即具有确定元素个数的行结构。可以看出DataFrame其实是类似于数据库的表结构,后面可以看到对它的操作也和表的操作很类似。

type DataFrame = DataSet[Row]

 
官网更鼓励使用DataSet/DataFrame,见下文:

Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more complete reference at the RDD programming guide. However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the SQL programming guide to get more information about Dataset.

可以看到Spark2.0以后,DataSet取代了RDD,并具有更高的性能(其中一点便是DataSet支持sql(如select、join、union、groupBy等)操作,可以像操作数据库表/视图似的来进行数据处理)。

参考链接:http://spark.apache.org/docs/latest/quick-start.html

当然,有的场景RDD比DataSet/DataFrame更方便数据处理,比如有个数据集,每行包含很多字段,但是我们只需要获取其中的某几个字段,如果用DataSet/DataFrame,必须定义所有字段的结构,但是,如果使用RDD进行处理,直接获取每行的指定字段即可,不需要关心其他字段,后续对特定字段的操作再转换为DataSet/DataFrame处理即可,可见,RDD和DataSet结合使用有时候更方便数据数据。

下面分别对RDD、DataSet、DataFrame的使用方法进行介绍。

一.RDD

1.1 RDD操作类型

RDD操作主要分为两类:Transformations与Actions。官方将Transformations操作定义为从一个数据集中生成另一个数据集;将Actions操作定义为对数据集进行一系列计算以后返回给驱动程序一个值。可以看出数据转换(map)、合并(union)、过滤(filter)等操作均为Transformations类型,因为他们的结果仍然是一个数据集,而数据聚合(reduce)、统计(count)、保存(saveAsXXX)等操作均为Actions类型,原因是他们的最终都要将结果返回给驱动程序(即对结果进行汇总,而Transformations操作只需要在各个node/slave上执行)。

之所以要区分操作类型,是因为Transformations操作是滞后的,不会马上执行,只有当程序要返回结果给驱动程序时才会执行,所以定义了Transformations操作后立马执行println来输出某个值是得不到结果的,只有执行过Actions操作才能得到结算结果,且Actions操作会被立即执行。

官方列出的常用Transformations操作包括:map、filter、flatMap、mapPartitions、mapPartitionsWithIndex、sample、union、intersection、distinct、groupByKey、reduceBykey、aggregateByKey、sortByKey、join、cogroup、cartesian、pipe、coalsce、repartition、repartitionAndSortWithinPartitions;Actions操作包括:reduce、collect、count、first、take、takeSample、takeOrdered、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、countByKey、foreach。具体用法可以参考官方API

1.2 生成RDD

我们可以通过SparkContext来生成RDD,下面是两种获取SparkContext实例的方法。

//1.SparkContext
val sc = new SparkContext(new SparkConf().setAppName("Spark Context"))
val rdd1 = sc.textFile("data.txt") //2.SparkSession
val spark = SparkSession.builder().appName("Spark Session").getOrCreate()
val rdd2 = spark.sparkContext.textFile("data.txt")

上例中通过textFile读取本地文件来生成RDD,textFile参数可以是HDFS路径、本地文件(非单机模式下需要每个node上都有)或者任何hadoop文件系统支持的URI;除了textFile还可以使用hadoopFile、hadoopRDD、parallelize、makeRDD来生成RDD。这里提一下,textFile支持通配符形式的path,比如hdfs://xx.xx.xx.xx:9000/path1/ds=*/*.gz,特别适用于按分区存储的数据处理。

1.3 RDD处理

下面通过一个例子演示一下RDD常用操作的用法:

下面的代码对保存在HDFS上的日志文件进行解析、过滤、统计Title字段的字节数并计算Title的最大长度。

package com.personal.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel object RDDTest {
def main(args: Array[String]): Unit = {
val MinFieldsLength = 53
val VTitleIndex = 11 val inputPath = "hdfs://192.168.1.44:9000/user/name/input/attempt_1530774346064"
val outputPath = "hdfs://192.68.1.44:9000/user/name/output/" val sparkConf = new SparkConf().setAppName("RDD Test")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile(inputPath) val lineCounter = sc.longAccumulator("LineCounter") val resultRdd = rdd.map(_.split("\t"))
.filter(
fields =>{
lineCounter.add(1)
if(fields.length < MinFieldsLength) false else true
}
)
.map(fields => fields(VTitleIndex).length)
.persist(StorageLevel.MEMORY_ONLY) resultRdd.saveAsTextFile(outputPath)
val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b) println(s"Line count: ${lineCounter.value}")
println(s"Max title length: ${maxTitleLength}") sc.stop()
}
}

例中先初始化一个SparkContext对象,然后通过textFile读取hdfs中的文件,生成一个RDD,接着调用map逐行分割字符串,再调用filter对字段数不合法的行进行过滤,接着再计算每行的Title字段长度并写入hdfs,同时使用reduce计算Title的最大长度,最后输出统计信息。根据RDD操作类型定义,文中调用map->filter->map的过程是不会马上被执行的,直到调用saveAsTextFile和reduce时才会被执行。

上例中用到了一个特殊的变量——累加器(Accumulator),通过SparkContext.longAccumulator(name: String)定义,顾名思义,只能进行加法操作,用于计数器或求总和。这类变量在Spark中称为共享变量(Shared Variables),即在集群的各个node中的值是相同的),与共享变量相反,程序中定义的其他变量在集群的各个node之间是互相独立的

除了计数器,Spark还支持另一种共享变量——广播变量(Broadcast Variables),它是只读的,被cache到每台机器中,常用于各个node之间的大规模数据分发。Spark任务是分阶段执行的,各个阶段需要的数据便是通过broadcast方式分发的,cache时进行序列化,任务执行时再反序列化。因此,只有在各个阶段需要同一份数据或需要cache反序列化后的值时才需要显式定义broadcast变量,通过调用SparkContext.broadcast(value: T)来定义。

org.apache.spark.rdd.RDD.map原型为:

def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
      Return a new RDD by applying a function to all elements of this RDD.

可以看到,map是一个高阶函数,即参数也是一个函数;第二个为隐式参数,不需要显示赋值(需要初始化spark后"import spark.implicits._"),程序会根据上下文自动赋值。map常用于对数据逐行处理,返回值是个新的RDD,处理后的结果数不变。如上例中:

val resultRdd = rdd.map(_.split("\t"))

org.apache.spark.rdd.RDD.filter原型为:

def filter(f: (T) ⇒ Boolean): RDD[T]
      Return a new RDD containing only the elements that satisfy a predicate.

同map一样,filter也是一个高阶函数,函数返回值为true时保留该数据,为false时过滤掉该数据。如上例中:

val resultRdd = rdd.map(_.split("\t"))
.filter(
fields =>{
lineCounter.add(1)
if(fields.length < MinFieldsLength) false else true
}
)

org.apache.spark.rdd.RDD.saveAsTextFile原型为:

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
      Save this RDD as a compressed text file, using string representations of elements.

def saveAsTextFile(path: String): Unit
      Save this RDD as a text file, using string representations of elements.

saveAsTextFile为Actions类型的方法,用于将rdd结果以text格式持久化到指定path下,写入的时候会检查是否path已经存在,存在则抛出异常。第二个参数用于指定压缩类型,如org.apache.hadoop.io.compress.GzipCodec、com.hadoop.compression.lzo.LzoCodec,默认不压缩。如上例中:

resultRdd.saveAsTextFile(outputPath)

org.apache.spark.rdd.RDD.reduce原型为:

def reduce(f: (T, T) ⇒ T): T
      Reduces the elements of this RDD using the specified commutative and associative binary operator.

reduce的参数为同一类型的二元操作函数,即“T <operator> T”,可用于求最值,求和等聚合需求。如上例中:

val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b)

上例中还用到了一个Spark中很重要的功能——持久化(Persistence),它将RDD持久化/缓存到各个node的内存中以加速后续的计算。可以通过调用persist() 或 cache()来使RDD持久化,cache的存储方式是反序列化后写入内存,persist的存储方式(StorageLevel)可以通过参数指定,不指定参数等同于cache,可选的存储方式包括:

类型 说明
MEMORY_ONLY 将RDD反序列化为java objects写入JVM。若无法完全写入内存,则部分partiton内的数据将在需要的时候重新计算。
MEMORY_AND_DISK 将RDD反序列化为java objects写入JVM。若无法完全写入内存,则无法写入内存的写入磁盘,需要的时候从磁盘读取。
MEMORY_ONLY_SER 将RDD序列化为java objects写入JVM。
MEMORY_AND_DISK_SER 将RDD序列化为java objects写入JVM。若无法完全写入,则无法写入内存的部分写入磁盘。
DISK_ONLY 将RDD写入磁盘
MEMORY_ONLY_2
MEMORY_AND_DISK_2
与MEMORY_ONLY、MEMORY_AND_DISK类似,但每个partition会备份到两个nodes中。
OFF_HEAP 与MEMORY_ONLY_SER类似,但是会写入堆外内存(off-heap memory),前提是启用了堆外内存。

其他操作可以查阅API文档或其他资料,这里不再举例。

上一篇:less用法小结


下一篇:SQL日期格式转换(经常用又经常忘记的东西)转载自http://www.cnblogs.com/wangyuelang0526/archive/2012/06/06/2538224.html