首先导入maven依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
dataframe
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo1 { //创建case类 case class User(name:String,age:Int) def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getOrCreate() //隐式转换 import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) val df: DataFrame = raw.toDF("name", "age") df.show() //创建表格 df.createOrReplaceTempView("user") //执行sql语句 val selectUser: DataFrame = spark.sql("select * from user where age > 21") selectUser.show() //转换成dataset val ds: Dataset[User] = df.as[User] //转换回rdd val rdd: RDD[Row] = df.rdd //遍历该rdd for(row <- rdd){ println(row.getString(0)) println(row.getInt(1)) } spark.stop() } }
注意:
1)sparksession的创建不能用new SparkSession的方式,而是利用伴生对象SparkSession来创建builder,通过builder来创建sparksession。
2)隐式转换import spark.implicits._不是引入了一个包,spark指的是程序上下文环境中的sparksession对象,所以这里引入了该对象的implicitis方法,_指代该方法的参数。如果该对象改名为sparksession,相应的隐式转换语句变为import sparksession.implicitis._
3)row对象的get方法的下标索引是从0开始,而不像jdbc的resultset下标索引从1开始。
DataSet
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo2 { //创建case类 case class People(name:String, age:Int) def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getOrCreate() //隐式转换 import spark.implicits._ //创建dataset val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() //转换成dataframe val peopleDataframe: DataFrame = peopleDataset.toDF() peopleDataframe.show() //转换成rdd val rdd: RDD[People] = peopleDataset.rdd //遍历该rdd for(people <- rdd){ println(people.name+"\t"+people.age) } spark.stop() } }