map
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
返回通过函数func传递源的每个元素形成的新的分布式数据集。通过函数得到一个新的分布式数据集。
var rdd = session.sparkContext.parallelize(1 to 10)
rdd.foreach(println)
println("=========================")
rdd.map(x => (x,1)).foreach(println)
结果:
67891012345
=========================
(6,1)(7,1)(8,1)(9,1)(10,1)(1,1)(2,1)(3,1)(4,1)(5,1)
filter
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
通过自定义函数对元素进行过滤
val rdd = session.sparkContext.parallelize(1 to 10)
rdd.foreach(print)
val rdd2 = rdd.filter(_>6)
println("=========================")
rdd2.foreach(print)
结果:
67891012345
=========================
78910
filtMap
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
通过自定义函数把RDD中的每一个元素映射成多个元素,返回一个集合。
val ds = session.sparkContext.textFile("D:/公司/test.txt")
ds.foreach(println) val ds2 = ds.flatMap(x => {
x.toString().split(":")
}) println("===================") ds2.foreach(println)
结果:
{ "DEVICENAME": "����4", "LID": 170501310, "ADDRESS": "xxxx", "ID": 230001160 }
===================
{ "DEVICENAME"
"����4", "LID"
170501310, "ADDRESS"
"xxxx", "ID"
230001160 }
mapFunction
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
类型map.不过是分区进行。类似于批量。
mapPartitionsWithIndex
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
sample
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
采集一个RDD的随机样本。
其中包含三个参数
replacement 布尔类型,表示是否重样。
fraction 返回的比例数 介于0到1。如原来RDD数10,fraction=0.5,那么将返回一个长度为5的随机RDD。
seed 表示随机比例。默认为long的最大值。如果此值为恒值(不随机),那么返回的RDD相等。
union
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
将两个RDD合并,不去重。
intersection
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
返回两个RDD的交集。去重。
var rdd = session.sparkContext.parallelize(1 to 10)
rdd.foreach(println)
val rdd2 = rdd.sample(true, 0.5)
println("==============")
rdd2.foreach(println)
val rdd3 = rdd.intersection(rdd2)
println("==============")
rdd3.foreach(println)
结果:
==============
89955
==============
958
distinct
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. |
对RDD进行去重。参数为任务数。
其内部实现原理对元素进行分组,然后取第一个。
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
var rdd = session.sparkContext.parallelize(1 to 10)
val rdd2 = rdd.sample(true, 0.5)
rdd2.foreach(print)
println("====================")
val rdd3 = rdd2.distinct(10)
rdd3.foreach(print)
结果:
7792224
==============
4279
groupByKey
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks. |
对一个(k,v)对的数据集进行k的分组,并返回一个v的集合。此算子使用前提是一个(k,v)对的RDD。
官方的建议是,如果要进行类似操作,最好使用reduceByKey
或者aggregateByKey
。
相当于group by ,它不可以自定义函数。如果在这个基础上需要做count等运算,需要使用reduceByKey
或者aggregateByKey
。
groupBy
groupBy和groupByKey略有不同。1:groupby可以自定义key;2:在返回值上,groupby返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]
val seq = Seq[String]("spark", "hadoop", "spark")
val rdd = session.sparkContext.parallelize(seq)
val rdd2 = rdd.map(x => (x, 1)).groupBy(_._1)//默认元素为key,此时同groupByKey,但返回值略有不同
rdd2.foreach(println) println("==============")
val rdd4 = rdd.map(x => (x, 1)).groupBy(x => {
x._1 + new Random().nextInt(100);//可以自定义key
}) rdd4.foreach(println) println("==============")
val rdd3 = rdd.map(x => (x, 1)).groupByKey()//默认元素为key
rdd3.foreach(println)
结果:
(spark,CompactBuffer((spark,1), (spark,1)))
(hadoop,CompactBuffer((hadoop,1)))
==============
(spark92,CompactBuffer((spark,1)))
(hadoop72,CompactBuffer((hadoop,1)))
(spark46,CompactBuffer((spark,1)))
==============
(spark,CompactBuffer(1, 1))
(hadoop,CompactBuffer(1))
reduceByKey
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
针对一个(K,V)对的RDD,返回一个对K去重的值。这个具体的值是什么样,取决于第一个参数。
val seq = Seq[String]("spark", "hadoop", "spark")
val rdd = session.sparkContext.parallelize(seq)
val rdd2 = rdd.map(x => (x, 1)).reduceByKey(_+_)
rdd2.foreach(println)
结果:
(spark,2)
(hadoop,1)
小结:groupBy groupByKey reduceByKey
1:groupBy可以自定义key;2:在返回值上,groupBy返回的是[key,{key:value1,key:value2}],而groupByKey返回的是[key,{value1,value2}]
2:reduceByKey(func, [numTasks])的第一个参数为自定义函数,可以对结果进行再处理。groupBy([numTasks])和groupByKey([numTasks])都不能自定义函数,如实现wordcount的功能,需额外使用算子或自定义实现。
3:reduceByKey和groupByKey内部原理不一样。这一点在官方注释上已经讲得很明白。reduceByKey会经过类似于Map与reduce之间的combiner操作(similarly to a "combiner" in MapReduce.)。会将各个节点上的数据进行合并之后再进行传输。
reduceByKey
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
groupByKey
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
示例一个用三种方法实现经典的wordcount的例子。
val seq = Seq[String]("spark", "hadoop", "spark")
val rdd = session.sparkContext.parallelize(seq)
//reduceByKey
val rdd_reduceByKey = rdd.map((_,1)).reduceByKey(_+_).foreach(println)
//groupby
val rdd_groupby = rdd.map((_,1)).groupBy(_._1).map(x=>{
(x._1,x._2.map(x => {x._2}))
}).map(x => (x._1,x._2.sum)).foreach(println)
//groupByKey
val rdd_groupByKey = rdd.map((_,1)).groupByKey().map(x=>(x._1,x._2.sum)).foreach(println)
结果:
(spark,2)
(hadoop,1)
aggregateByKey
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
此函数类似于reduceByKey。它提供了三个参数。第一个参数是一个初始值。后两个参数为自定义函数。
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。在client模式下,注意设置master的core数。
/**
* a:param1 b:rdd._2
*/
def seqs(a:Int,b:Int) : Int =b def comb(a: Int, b: Int): Int = a+b def aggregateByKey(session : SparkSession){
val seq = Seq[String]("spark", "hadoop", "spark")
val rdd = session.sparkContext.parallelize(seq).map((_,1))
//("spark",1)
rdd.aggregateByKey(2)(seqs, comb).foreach(println)
}
结果:
(spark,2)
(hadoop,1)
sortByKey sortBy
sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
聚合排序。sortBy针对RDD,sortByKey针对(K,V)对的RDD。排序的问题,在client模式下,注意设置master的core数。
val seq = Seq[String]("spark", "hadoop", "apark","spark", "hadoop", "spark")
val rdd = session.sparkContext.parallelize(seq)
rdd.sortBy(x=>x, true,1).foreach(println)
rdd.map((_,1)).sortByKey(true,1).foreach(println)
结果:
apark
hadoop
hadoop
spark
spark
spark
===============
(apark,1)
(hadoop,1)
(hadoop,1)
(spark,1)
(spark,1)
(spark,1)
join leftOuterJoin rightOuterJoin fullOuterJoin
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin , rightOuterJoin , and fullOuterJoin . |
join相当于inner join。其它参照sql。
cogroup
cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith . |
cogroup类似全外连接,即fullOuterJoin。不同的是,cogroup会根据key做分组聚合。而且cogroup可以连接多个RDD。
val seq = Seq[String]("spark", "hadoop", "apark", "spark", "hadoop", "spark")
val seq2 = Seq[String]("spark1", "hadoop", "apark1")
val rdd = session.sparkContext.parallelize(seq).map((_,1))
val rdd2 = session.sparkContext.parallelize(seq2).map((_,1))
rdd.fullOuterJoin(rdd2).foreach(println)
println("===================")
rdd.cogroup(rdd2).collect().foreach(println)
结果:
(apark,(Some(1),None))
(apark1,(None,Some(1)))
(spark1,(None,Some(1)))
(spark,(Some(1),None))
(spark,(Some(1),None))
(spark,(Some(1),None))
(hadoop,(Some(1),Some(1)))
(hadoop,(Some(1),Some(1)))
===================
(spark1,(CompactBuffer(),CompactBuffer(1)))
(spark,(CompactBuffer(1, 1, 1),CompactBuffer()))
(hadoop,(CompactBuffer(1, 1),CompactBuffer(1)))
(apark1,(CompactBuffer(),CompactBuffer(1)))
(apark,(CompactBuffer(1),CompactBuffer()))
cartesian
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
笛卡尔积。
pipe
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
调用外部程序。
coalesce
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
repartition
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
两个函数都是对RDD进行重分区。coalesce带有是否进行shuffle的参数,而repartition强制使用shuffle。