算子map

package sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo02Map {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("map").setMaster("local")

val sc = new SparkContext(conf)

/**
* map:一行一行处理rdd的数据
*/

/**
* 构建RDD的方法
* 1、读取文件
* 2、基于Scala集合构建RDD
*/

//基于Scal集合构建rdd
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

println("listRDD分区数据" + listRDD.getNumPartitions)

val mapRDD: RDD[Int] = listRDD.map(i => i * 2)

//打印RDD中的数据
mapRDD.foreach(println)

/**
* mapValues;处理kv格式rdd的value
*/
//转换成kv格式
val kvRDD: RDD[(Int, Int)] = listRDD.map(i => (i, i))
//对v进行*2
val mapValuesRDD: RDD[(Int, Int)] = kvRDD.mapValues(i => i * 2)

mapValuesRDD.foreach(println)

/**
* mapPartitions:一次处理一个分区的数据,返回值需要是一个迭代器
* mapPartitionsWithIndex:uole一个下标
*
*/
val mapPartitionsRDD: RDD[Int] =listRDD.mapPartitions((iter:Iterator[Int])=>{
val list: List[Int] = iter.toList
list.map(i=>i*2).toIterator
})
mapPartitionsRDD.foreach(println)

val mapPartitionsWithIndexRDD: RDD[Int] =listRDD.mapPartitionsWithIndex{
case (index:Int,iter:Iterator[Int])=>
println("当前分区编号:"+index)
iter
}
mapPartitionsWithIndexRDD.foreach(println)

}

}
上一篇:python pyppeteer util 设置标签页面的大小


下一篇:文件上传合法性检查(病毒检查 --基础版)