1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码 11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14 //todo:3、读取mysql中的数据 15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16 //todo:4、显示mysql中表的数据 17 mysqlDF.show() 18 spark.stop() 19 } 20 21 }
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码 11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14 //todo:3、读取mysql中的数据 15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16 //todo:4、显示mysql中表的数据 17 mysqlDF.show() 18 spark.stop() 19 } 20 21 }
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, SparkSession} 6 7 object InferringSchema { 8 def main(args: Array[String]): Unit = { 9 //todo:1、构建sparkSession 指定appName和master的地址 10 val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate() 11 //todo:2、从sparkSession获取sparkContext对象 12 val sc: SparkContext = spark.sparkContext 13 sc.setLogLevel("WARN") //设置日志输出级别 14 //todo:3、加载数据 15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 16 //todo:4、切分每一行记录 17 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) 18 //todo:5、将RDD与Person类关联 19 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) 20 //todo:6、创建dataFrame,需要导入隐式转换 21 import spark.implicits._ 22 val personDF: DataFrame = personRDD.toDF() 23 24 //todo-------------------DSL语法操作 start-------------- 25 //1、显示DataFrame的数据,默认显示20行 26 personDF.show() 27 //2、显示DataFrame的schema信息 28 personDF.printSchema() 29 //3、显示DataFrame记录数 30 println(personDF.count()) 31 //4、显示DataFrame的所有字段 32 personDF.columns.foreach(println) 33 //5、取出DataFrame的第一行记录 34 println(personDF.head()) 35 //6、显示DataFrame中name字段的所有值 36 personDF.select("name").show() 37 //7、过滤出DataFrame中年龄大于30的记录 38 personDF.filter($"age" > 30).show() 39 //8、统计DataFrame中年龄大于30的人数 40 println(personDF.filter($"age" > 30).count()) 41 //9、统计DataFrame中按照年龄进行分组,求每个组的人数 42 personDF.groupBy("age").count().show() 43 //todo-------------------DSL语法操作 end------------- 44 45 //todo--------------------SQL操作风格 start----------- 46 //todo:将DataFrame注册成表 47 personDF.createOrReplaceTempView("t_person") 48 //todo:传入sql语句,进行操作 49 50 spark.sql("select * from t_person").show() 51 52 spark.sql("select * from t_person where name=‘zhangsan‘").show() 53 54 spark.sql("select * from t_person order by age desc").show() 55 //todo--------------------SQL操作风格 end------------- 56 57 58 sc.stop() 59 } 60 } 61 62 case class Person (val id:Int,val name: String, val age: Int)
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 6 import org.apache.spark.sql.{DataFrame, Row, SparkSession} 7 8 object SparkSqlSchema { 9 def main(args: Array[String]): Unit = { 10 //todo:1、创建SparkSession,指定appName和master 11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate() 12 //todo:2、获取sparkContext对象 13 val sc: SparkContext = spark.sparkContext 14 //todo:3、加载数据 15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 16 //todo:4、切分每一行 17 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) 18 //todo:5、加载数据到Row对象中 19 val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt)) 20 //todo:6、创建schema 21 val schema: StructType = StructType(Seq( 22 StructField("id", IntegerType, false), 23 StructField("name", StringType, false), 24 StructField("age", IntegerType, false) 25 )) 26 27 //todo:7、利用personRDD与schema创建DataFrame 28 val personDF: DataFrame = spark.createDataFrame(personRDD, schema) 29 30 //todo:8、DSL操作显示DataFrame的数据结果 31 personDF.show() 32 33 //todo:9、将DataFrame注册成表 34 personDF.createOrReplaceTempView("t_person") 35 36 //todo:10、sql语句操作 37 spark.sql("select * from t_person").show() 38 39 spark.sql("select count(*) from t_person").show() 40 41 42 sc.stop() 43 } 44 }
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} 6 7 object SparkSqlToMysql { 8 def main(args: Array[String]): Unit = { 9 //val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10 //todo:1、创建sparkSession对象 11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate() 12 //todo:2、读取数据 13 val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 14 //todo:3、切分每一行, 15 val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) 16 //todo:4、RDD关联Student 17 val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt)) 18 //todo:导入隐式转换 19 import spark.implicits._ 20 //todo:5、将RDD转换成DataFrame 21 val studentDF: DataFrame = studentRDD.toDF() 22 //todo:6、将DataFrame注册成表 23 studentDF.createOrReplaceTempView("student") 24 //todo:7、操作student表 ,按照年龄进行降序排列 25 val resultDF: DataFrame = spark.sql("select * from student order by age desc") 26 27 //todo:8、把结果保存在mysql表中 28 //todo:创建Properties对象,配置连接mysql的用户名和密码 29 val prop = new Properties() 30 prop.setProperty("user", "root") 31 prop.setProperty("password", "123") 32 33 resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop) 34 35 //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错 36 //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop) 37 spark.stop() 38 } 39 } 40 41 //todo:创建样例类Student 42 case class student01(id: Int, name: String, age: Int)
1 package com.SparkStreaming_Flume_Poll 2 3 import java.net.InetSocketAddress 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object SparkStreaming_Flume_Poll { 11 12 //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1 13 //runningCount 历史的所有相同key的value总和 14 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 15 val newCount = runningCount.getOrElse(0) + newValues.sum 16 Some(newCount) 17 } 18 19 20 def main(args: Array[String]): Unit = { 21 //配置sparkConf参数 22 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") 23 //构建sparkContext对象 24 val sc: SparkContext = new SparkContext(sparkConf) 25 //设置日志级别 26 sc.setLogLevel("WARN") 27 //构建StreamingContext对象,每个批处理的时间间隔 28 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) 29 //设置checkpoint 30 scc.checkpoint("./") 31 //设置flume的地址,可以设置多台 32 val address = Seq(new InetSocketAddress("192.168.107.144", 8888)) 33 // 从flume中拉取数据 34 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK) 35 36 //获取flume中数据,数据存在event的body中,转化为String 37 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array())) 38 //实现单词汇总 39 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction) 40 41 result.print() 42 scc.start() 43 scc.awaitTermination() 44 } 45 46 47 }
spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll