idea开发SparkSQL程序

首先导入maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

 

 

dataframe

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object Demo1 {

  //创建case类
  case class User(name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getOrCreate()
//隐式转换
    import spark.implicits._

    val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23)))

    val df: DataFrame = raw.toDF("name", "age")

    df.show()

    //创建表格
    df.createOrReplaceTempView("user")

    //执行sql语句
    val selectUser: DataFrame = spark.sql("select * from user where age > 21")

    selectUser.show()

    //转换成dataset
    val ds: Dataset[User] = df.as[User]

    //转换回rdd
    val rdd: RDD[Row] = df.rdd

    //遍历该rdd
    for(row <- rdd){
     println(row.getString(0))
     println(row.getInt(1))
    }

    spark.stop()
  }

}

注意:

1)sparksession的创建不能用new SparkSession的方式,而是利用伴生对象SparkSession来创建builder,通过builder来创建sparksession。

2)隐式转换import spark.implicits._不是引入了一个包,spark指的是程序上下文环境中的sparksession对象,所以这里引入了该对象的implicitis方法,_指代该方法的参数。如果该对象改名为sparksession,相应的隐式转换语句变为import sparksession.implicitis._

3)row对象的get方法的下标索引是从0开始,而不像jdbc的resultset下标索引从1开始

 

DataSet

 

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object Demo2 {

  //创建case类
  case class People(name:String, age:Int)

  def main(args: Array[String]): Unit = {

    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getOrCreate()

    //隐式转换
    import spark.implicits._

    //创建dataset
    val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS()

    //转换成dataframe
    val peopleDataframe: DataFrame = peopleDataset.toDF()

    peopleDataframe.show()

    //转换成rdd
    val rdd: RDD[People] = peopleDataset.rdd

    //遍历该rdd
    for(people <- rdd){
      println(people.name+"\t"+people.age)
    }

    spark.stop()
  }
}

 

上一篇:SparkSQL 如何自定义函数


下一篇:适合小白入门的IDEA开发SparkSQL详细教程