一个spark SQL和DataFrames的故事

package com.lin.spark

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

/**
  * Created by Yaooo on 2019/6/8.
  */
object SparkSQLExample {
  case class Person(name:String,age:Long)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Spark SQL")
      .config("spark.come.config.option","some-value")
        .master("local[2]")
      .getOrCreate()

    runBasicDataFrameExample(spark)
    runDatasetCreationExample(spark)
    runInferSchemaExample(spark)
    runProgrammaticSchemaExample(spark)
  }
  private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
    import spark.implicits._
    val personRDD = spark.sparkContext.textFile("src/main/resources/people.txt")

    val schemaString = "name age"

    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))

    val schema = StructType(fields)

    val rowRDD = personRDD
      .map(_.split(","))
      .map(att => Row(att(0),att(1).trim))

    val peopleDF = spark.createDataFrame(rowRDD,schema)

    peopleDF.createOrReplaceTempView("people")

    val results = spark.sql("select * from people")

    results.map(att=>"Name : "+att(0)).show()

  }

  private def runInferSchemaExample(spark:SparkSession): Unit ={
    import spark.implicits._
    val personDF = spark.sparkContext
      .textFile("src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
      .toDF()

    personDF.createOrReplaceTempView("people")

    val teenagersDF = spark.sql("select * from people where age between 13 and 19")
    teenagersDF.show()
    teenagersDF.map(teenager =>"name: "+teenager(0)).show()
    teenagersDF.map(teenager => "Name: "+ teenager.getAs[String]("name")).show()

    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name","age"))).collect()
      .foreach(println)
  }

  private def runDatasetCreationExample(spark:SparkSession): Unit ={
    import spark.implicits._
    val caseClassDS = Seq(Person("Andy",18)).toDF()
    caseClassDS.show()

    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_+1).collect().foreach(println)

    val path = "src/main/resources/person.json"
    val personDS = spark.read.json(path).as[Person]
    personDS.show()
  }

  private def runBasicDataFrameExample(spark:SparkSession): Unit ={
    import spark.implicits._
    val df = spark.read.json("src/main/resources/person.json")
    df.show()
    df.printSchema()
    df.select("name").show()
    df.select($"name",$"age"+1).show()
    df.filter($"age">21).show()
    df.groupBy($"age").count().show()

    /*df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("select * from people")
    sqlDF.show()*/

    df.createOrReplaceGlobalTempView("people")
    spark.sql("select * from global_temp.people").show()
  }
}

 

上一篇:python连接数据库(mysql、oracle、sqlserver)遇到的那些坑。。。


下一篇:pandas,读取或存储DataFrames的数据到mysql中