SparkSQL读取数据加载DataFrame

加载DataFrame的流程:

 

①.创建SparkSession对象
②.创建DataFrame对象
③.创建视图
④.数据处理

 

1、读取CSV格式的数据加载DataFrame

 

 1 val session = SparkSession.builder().master("local").appName("test").getOrCreate()
 2 //    val frame: DataFrame = session.read.option("header",true).csv("./data/csvdata.csv")
 3     val frame = session.read.option("header",true).format("csv").load("./data/csvdata.csv")
 4     frame.show()
 5     /**
 6      * +---+--------+---+-----+
 7      * | id|    name|age|score|
 8      * +---+--------+---+-----+
 9      * |  1|zhangsan| 18|  100|
10      * |  2|    lisi| 19|  200|
11      * |  3|  wangwu| 20|  300|
12      * |  4|   maliu| 21|  400|
13      * |  5|  tianqi| 22|  500|
14      * +---+--------+---+-----+
15      */
16     frame.createTempView("df1")
17     session.sql(
18       """
19         |select * from df1
20         |""".stripMargin).show()
21     /**
22      * +---+--------+---+-----+
23      * | id|    name|age|score|
24      * +---+--------+---+-----+
25      * |  1|zhangsan| 18|  100|
26      * |  2|    lisi| 19|  200|
27      * |  3|  wangwu| 20|  300|
28      * |  4|   maliu| 21|  400|
29      * |  5|  tianqi| 22|  500|
30      * +---+--------+---+-----+
31      */
32     //将数据写出到指定文件夹xxxx下
33     frame.write.csv("./xxxx")

 

2、读取Hive中数据加载DataFrame

 

1).需要开启Hive支持 : .enableHiveSupport()
2).直接写SparkSQL操作的就是Hive中的表数据
3).可以将分析结果再次保存到Hive中,表不需要自己创建,会自动创建。
4).直接加载Hive中的数据表 可以使用 session.table(xx) 得到DataFrame
val session = SparkSession.builder().appName("test").enableHiveSupport().getOrCreate()
session.sql("use spark")
val df: DataFrame = session.sql("select count(*) from jizhan")
df.show()
//读取Hive表中的数据加载DataFrame
val df2 = session.table("jizhan")
df2.show()

3、读取json格式的数据加载DataFrame

 

  数据如下:

{"name":"zhangsan","age":18}
{"name":"lisi","age":19}
{"name":"tianqi","age":22}
{"name":"zhangsan"}
{"name":"lisi"}
......

加载DataFrame的方式:

 1 val session :SparkSession = SparkSession.builder().master("local").appName("sqltest").getOrCreate()
 2 session.sparkContext.setLogLevel("Error")
 3 val df1: DataFrame = session.read.json("./data/jsondata")
 4 df1.createGlobalTempView("t2")
 5 val session2 :SparkSession = session.newSession()
 6 session2.sql("select name ,age from global_temp.t2").show()
 7 /**
 8  * +--------+----+
 9  * |    name| age|
10  * +--------+----+
11  * |zhangsan|  18|
12  * |    lisi|  19|
13  * |  tianqi|  22|
14  * |zhangsan|null|
15  * |    lisi|null|
16  * +--------+----+
17  */

4、读取json格式的RDD加载DataFramejson格式的RDD就是RDD中的String是一个json字符串)

 1 val session = SparkSession.builder().master("local").appName("test").getOrCreate()
 2 val jsonArr :Array[String] = Array[String](
 3   "{\"name\":\"zhangsan\",\"age\":18}",
 4   "{\"name\":\"lisi\",\"age\":19}",
 5   "{\"name\":\"wangwu\",\"age\":20}",
 6   "{\"name\":\"maliu\",\"age\":21}",
 7   "{\"name\":\"tianqi\",\"age\":22}"
 8 )
 9 import session.implicits._
10 val jsonDataset: Dataset[String] = jsonArr.toList.toDS()
11 val df1: DataFrame = session.read.json(jsonDataset)
12 df1.createTempView("t")
13 val df2: DataFrame = session.sql("select name,age from t where name like ‘%zhangsan%‘")
14 df2.show()
15 /**
16  * +--------+---+
17  * |    name|age|
18  * +--------+---+
19  * |zhangsan| 18|
20  * +--------+---+
21  */

5、读取mysql中的数据加载DataFrame

 

    1).读取MySQL表加载DataFrame有三种方式。

 

    2).可以将结果保存回MySQL中。

 

    3).spark.sql.shuffle.partitions ,默认200,指定当产生shffle时,底层转成Spark job的分区数。

 

    4).如果读取的是MySQL中的关联语句,需要使用别名方式用SparkSQL读取。

 

第一种方式:

 1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate()
 2     val props = new Properties()
 3     props.setProperty("user","root")
 4     props.setProperty("password","123456")
 5     val person: DataFrame = session.read.jdbc("jdbc:mysql://192.168.179.14/spark","person",props)
 6 //将结果保存回MySQL中
 7 person.createTempView("person")
 8 score.createTempView("score")
 9 val resultDF = session.sql(
10   """
11     |select a.id,a.name,a.age,b.score from person a ,score b where a.id = b.id
12   """.stripMargin)
13 resultDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.14/spark","result",props)

第二种方式:

1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate()
2 val map = Map[String,String](
3   "user"->"root",
4   "password"->"123456",
5   "url"->"jdbc:mysql://192.168.179.14/spark",
6   "driver"->"com.mysql.jdbc.Driver",
7   "dbtable"->"score"
8 )
9 val score = session.read.format("jdbc").options(map).load()

第三种方式:

1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate()
2 val reader: DataFrameReader = session.read.format("jdbc").option("user", "root")
3   .option("password", "123456")
4   .option("url", "jdbc:mysql://192.168.179.14/spark")
5   .option("driver", "com.mysql.jdbc.Driver")
6   .option("dbtable", "score")
7 val score2 = reader.load()

6、普通格式的RDD转换成DataFrame

 

有两种转换方式:

 

.通过反射方式将普通格式的RDD转换成DataFrame

 

注意: 反射的方式将自定义类型的RDD转换成DataFrame过程中,会自动将对象中的属性当做DataFrame 中的列名,将自定义对象中的属性的类型当做DataFrame列的schema信息。

 

 1 val session = SparkSession.builder().master("local").appName("test").getOrCreate()
 2 val sc = session.sparkContext
 3 sc.setLogLevel("Error")
 4 val personInfos: RDD[String] = sc.textFile("./data/personInfo")
 5 val personRDD: RDD[PersonInfo] = personInfos.map(info => {
 6   val arr = info.split(",")
 7   val id = arr(0).toInt
 8   val name = arr(1)
 9   val age = arr(2).toInt
10   val score = arr(3).toDouble
11   PersonInfo(id, name, age, score)
12 })
13 import session.implicits._
14 val frame: DataFrame = personRDD.toDF()

 

.通过动态创建Schema方式将普通格式的RDD转换成DataFrame

注意:创建StructType类型的数据时,StructField字段的顺序需要与构建的RDD[Row]中每个Row中放入数据的顺序保持一致。

 

 1 val session = SparkSession.builder().master("local").appName("test").getOrCreate()
 2 val sc = session.sparkContext
 3 sc.setLogLevel("Error")
 4 val personInfos:RDD[String] = sc.textFile("./data/personInfo")
 5 val rowRDD: RDD[Row] = personInfos.map(line => {
 6   val arr = line.split(",")
 7   val id = arr(0).toInt
 8   val name = arr(1)
 9   val age = arr(2).toInt
10   val score = arr(3).toDouble
11   Row(id, name, age, score)
12 })
13 val structType = StructType(List[StructField](
14   StructField("id",DataTypes.IntegerType,true),
15   StructField("name",DataTypes.StringType,true),
16   StructField("age",DataTypes.IntegerType,true),
17   StructField("score",DataTypes.DoubleType,true)
18 ))
19 val frame = session.createDataFrame(rowRDD,structType)

 

7、Tuple格式的DataSet加载DataFrame

 

 1 val session = SparkSession.builder().master("local").appName("test").getOrCreate()
 2 session.sparkContext.setLogLevel("Error")
 3 import session.implicits._
 4 val ds: Dataset[String] = session.read.textFile("./data/pvuvdata")
 5 val tupleDs: Dataset[(String, String, String, String, String, String, String)] = ds.map(line => {
 6   //126.54.121.136 浙江 2020-07-13 1594648118250  4218643484448902621  www.jd.com Comment
 7   val arr = line.split("\t")
 8   val ip = arr(0)
 9   val local = arr(1)
10   val date = arr(2)
11   val ts = arr(3)
12   val uid = arr(4)
13   val site = arr(5)
14   val operator = arr(6)
15   (ip, local, date, ts, uid, site, operator)
16 })
17 val frame: DataFrame = tupleDs.toDF("ip","local","date","ts","uid","site","operator")

 

SparkSQL读取数据加载DataFrame

上一篇:ef linq方式插入+sql操作数据注意事项


下一篇:GreatSQL MGR安装