045 RDD与DataFrame互相转换

一:RDD与DataFrame互相转换

1.总纲

  045 RDD与DataFrame互相转换

二:DataFrame转换为RDD

1.rdd

  使用schema可以获取DataFrame的schema

  使用rdd可以获取DataFrame的数据

三:RDD转换为DataFrame

1.第一种方式

  使用反射,

  RDD的数据类型必须是case class。

     import sqlContext.implicits._                //如果不写,下面的转换不成功

     //transform
val path="/spark/logs/input"
val rdd=sc.textFile(path)
val apacheAccessDataFrame=rdd
.filter(line=>ApacheAccessLog.isValidateLogLine(line))
.map(line => {
ApacheAccessLog.parseLogLine(line)
}).toDF() //rdd转换为DataFrame

  其中,ApacheAccessLog.parseLogLine(line)是case class类型。

2:第二种方式

 package com.scala.it
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object CreateDataFrameDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("hive-join-mysql")
// 使用kryo序列化机制
conf.registerKryoClasses(Array(classOf[Row], classOf[Tuple3[Int, String, Double]]))
val sc = SparkContext.getOrCreate(conf) val sqlContext = new SQLContext(sc) // ===================================
// RDD中Row中的各个列的类型必须是一致的(不能有歧义)
val rdd: RDD[Row] = sc.parallelize(Array(
(1, "Tom", 1234.1),
(2, "Lili", 12532.2),
(3, "Gerry", 123.0)
)).map {
case (id, name, salary) => {
Row(id, name, salary)
}
}
val schema: StructType = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("salary", DoubleType)
)) val df = sqlContext.createDataFrame(rdd, schema)
df.show()
}
}

3.解释上面的程序

  产生RDD有两种方式,读取数据源,或者序列化

  这里使用序列化产生RDD。

  -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

  关于rdd中为什么要使用Row:

  045 RDD与DataFrame互相转换

  ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

  关于StructType:

  045 RDD与DataFrame互相转换

  -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

  关于StructField:

    其中,后两个是默认参数,可以不给。

  045 RDD与DataFrame互相转换

  

上一篇:oschina(开源中国)的Git代码托管平台使用教程


下一篇:贝叶斯、朴素贝叶斯及调用spark官网 mllib NavieBayes示例