1 package com.bawei.foryk 2 3 import java.util.Properties 4 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.sql.{DataFrame, SparkSession} 7 8 9 case class Student(name:String,sex:String,age:Int) 10 object SparkSqlReview01 { 11 def main(args: Array[String]): Unit = { 12 val spark: SparkSession = SparkSession 13 .builder() 14 .appName("SparkSqlTraffic01") 15 .master("local") 16 .getOrCreate() 17 18 //读取文件创建RDD 19 val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/data.txt") 20 21 val studentRDD: RDD[Student] = lineRDD.map(line => { 22 val strings: Array[String] = line.split(",") 23 Student(strings(0), strings(1), strings(2).toInt) 24 }) 25 26 import spark.implicits._ 27 val studentDF: DataFrame = studentRDD.toDF() 28 29 studentDF.createOrReplaceTempView("student") 30 31 val resultDF: DataFrame = spark.sql("select * from student where age <20") 32 33 val prop =new Properties() 34 prop.setProperty("user","root") 35 prop.setProperty("password","") 36 37 resultDF.write.jdbc("jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&characterEncoding=UTF-8","student",prop) 38 39 40 41 spark.stop() 42 43 } 44 45 }
sparksql -自定义函数保存到mysql ---开窗函数( row_number() over ) 自定义函数(UDF) ?(输出到文件,输出到mysql)