加载DataFrame的流程:
①.创建SparkSession对象 ②.创建DataFrame对象 ③.创建视图 ④.数据处理
1、读取CSV格式的数据加载DataFrame
1 val session = SparkSession.builder().master("local").appName("test").getOrCreate() 2 // val frame: DataFrame = session.read.option("header",true).csv("./data/csvdata.csv") 3 val frame = session.read.option("header",true).format("csv").load("./data/csvdata.csv") 4 frame.show() 5 /** 6 * +---+--------+---+-----+ 7 * | id| name|age|score| 8 * +---+--------+---+-----+ 9 * | 1|zhangsan| 18| 100| 10 * | 2| lisi| 19| 200| 11 * | 3| wangwu| 20| 300| 12 * | 4| maliu| 21| 400| 13 * | 5| tianqi| 22| 500| 14 * +---+--------+---+-----+ 15 */ 16 frame.createTempView("df1") 17 session.sql( 18 """ 19 |select * from df1 20 |""".stripMargin).show() 21 /** 22 * +---+--------+---+-----+ 23 * | id| name|age|score| 24 * +---+--------+---+-----+ 25 * | 1|zhangsan| 18| 100| 26 * | 2| lisi| 19| 200| 27 * | 3| wangwu| 20| 300| 28 * | 4| maliu| 21| 400| 29 * | 5| tianqi| 22| 500| 30 * +---+--------+---+-----+ 31 */ 32 //将数据写出到指定文件夹xxxx下 33 frame.write.csv("./xxxx")
2、读取Hive中数据加载DataFrame
1).需要开启Hive支持 : .enableHiveSupport() 2).直接写SparkSQL操作的就是Hive中的表数据 3).可以将分析结果再次保存到Hive中,表不需要自己创建,会自动创建。 4).直接加载Hive中的数据表 可以使用 session.table(xx) 得到DataFrame
val session = SparkSession.builder().appName("test").enableHiveSupport().getOrCreate() session.sql("use spark") val df: DataFrame = session.sql("select count(*) from jizhan") df.show() //读取Hive表中的数据加载DataFrame val df2 = session.table("jizhan") df2.show()
3、读取json格式的数据加载DataFrame
数据如下:
{"name":"zhangsan","age":18} {"name":"lisi","age":19} {"name":"tianqi","age":22} {"name":"zhangsan"} {"name":"lisi"} ......
加载DataFrame的方式:
1 val session :SparkSession = SparkSession.builder().master("local").appName("sqltest").getOrCreate() 2 session.sparkContext.setLogLevel("Error") 3 val df1: DataFrame = session.read.json("./data/jsondata") 4 df1.createGlobalTempView("t2") 5 val session2 :SparkSession = session.newSession() 6 session2.sql("select name ,age from global_temp.t2").show() 7 /** 8 * +--------+----+ 9 * | name| age| 10 * +--------+----+ 11 * |zhangsan| 18| 12 * | lisi| 19| 13 * | tianqi| 22| 14 * |zhangsan|null| 15 * | lisi|null| 16 * +--------+----+ 17 */
4、读取json格式的RDD加载DataFrame(json格式的RDD就是RDD中的String是一个json字符串)
1 val session = SparkSession.builder().master("local").appName("test").getOrCreate() 2 val jsonArr :Array[String] = Array[String]( 3 "{\"name\":\"zhangsan\",\"age\":18}", 4 "{\"name\":\"lisi\",\"age\":19}", 5 "{\"name\":\"wangwu\",\"age\":20}", 6 "{\"name\":\"maliu\",\"age\":21}", 7 "{\"name\":\"tianqi\",\"age\":22}" 8 ) 9 import session.implicits._ 10 val jsonDataset: Dataset[String] = jsonArr.toList.toDS() 11 val df1: DataFrame = session.read.json(jsonDataset) 12 df1.createTempView("t") 13 val df2: DataFrame = session.sql("select name,age from t where name like ‘%zhangsan%‘") 14 df2.show() 15 /** 16 * +--------+---+ 17 * | name|age| 18 * +--------+---+ 19 * |zhangsan| 18| 20 * +--------+---+ 21 */
5、读取mysql中的数据加载DataFrame
1).读取MySQL表加载DataFrame有三种方式。
2).可以将结果保存回MySQL中。
3).spark.sql.shuffle.partitions ,默认200,指定当产生shffle时,底层转成Spark job的分区数。
4).如果读取的是MySQL中的关联语句,需要使用别名方式用SparkSQL读取。
第一种方式:
1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate() 2 val props = new Properties() 3 props.setProperty("user","root") 4 props.setProperty("password","123456") 5 val person: DataFrame = session.read.jdbc("jdbc:mysql://192.168.179.14/spark","person",props) 6 //将结果保存回MySQL中 7 person.createTempView("person") 8 score.createTempView("score") 9 val resultDF = session.sql( 10 """ 11 |select a.id,a.name,a.age,b.score from person a ,score b where a.id = b.id 12 """.stripMargin) 13 resultDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.14/spark","result",props)
第二种方式:
1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate() 2 val map = Map[String,String]( 3 "user"->"root", 4 "password"->"123456", 5 "url"->"jdbc:mysql://192.168.179.14/spark", 6 "driver"->"com.mysql.jdbc.Driver", 7 "dbtable"->"score" 8 ) 9 val score = session.read.format("jdbc").options(map).load()
第三种方式:
1 val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("test").getOrCreate() 2 val reader: DataFrameReader = session.read.format("jdbc").option("user", "root") 3 .option("password", "123456") 4 .option("url", "jdbc:mysql://192.168.179.14/spark") 5 .option("driver", "com.mysql.jdbc.Driver") 6 .option("dbtable", "score") 7 val score2 = reader.load()
6、普通格式的RDD转换成DataFrame
有两种转换方式:
①.通过反射方式将普通格式的RDD转换成DataFrame
注意: 反射的方式将自定义类型的RDD转换成DataFrame过程中,会自动将对象中的属性当做DataFrame 中的列名,将自定义对象中的属性的类型当做DataFrame列的schema信息。
1 val session = SparkSession.builder().master("local").appName("test").getOrCreate() 2 val sc = session.sparkContext 3 sc.setLogLevel("Error") 4 val personInfos: RDD[String] = sc.textFile("./data/personInfo") 5 val personRDD: RDD[PersonInfo] = personInfos.map(info => { 6 val arr = info.split(",") 7 val id = arr(0).toInt 8 val name = arr(1) 9 val age = arr(2).toInt 10 val score = arr(3).toDouble 11 PersonInfo(id, name, age, score) 12 }) 13 import session.implicits._ 14 val frame: DataFrame = personRDD.toDF()
②.通过动态创建Schema方式将普通格式的RDD转换成DataFrame
注意:创建StructType类型的数据时,StructField字段的顺序需要与构建的RDD[Row]中每个Row中放入数据的顺序保持一致。
1 val session = SparkSession.builder().master("local").appName("test").getOrCreate() 2 val sc = session.sparkContext 3 sc.setLogLevel("Error") 4 val personInfos:RDD[String] = sc.textFile("./data/personInfo") 5 val rowRDD: RDD[Row] = personInfos.map(line => { 6 val arr = line.split(",") 7 val id = arr(0).toInt 8 val name = arr(1) 9 val age = arr(2).toInt 10 val score = arr(3).toDouble 11 Row(id, name, age, score) 12 }) 13 val structType = StructType(List[StructField]( 14 StructField("id",DataTypes.IntegerType,true), 15 StructField("name",DataTypes.StringType,true), 16 StructField("age",DataTypes.IntegerType,true), 17 StructField("score",DataTypes.DoubleType,true) 18 )) 19 val frame = session.createDataFrame(rowRDD,structType)
7、Tuple格式的DataSet加载DataFrame
1 val session = SparkSession.builder().master("local").appName("test").getOrCreate() 2 session.sparkContext.setLogLevel("Error") 3 import session.implicits._ 4 val ds: Dataset[String] = session.read.textFile("./data/pvuvdata") 5 val tupleDs: Dataset[(String, String, String, String, String, String, String)] = ds.map(line => { 6 //126.54.121.136 浙江 2020-07-13 1594648118250 4218643484448902621 www.jd.com Comment 7 val arr = line.split("\t") 8 val ip = arr(0) 9 val local = arr(1) 10 val date = arr(2) 11 val ts = arr(3) 12 val uid = arr(4) 13 val site = arr(5) 14 val operator = arr(6) 15 (ip, local, date, ts, uid, site, operator) 16 }) 17 val frame: DataFrame = tupleDs.toDF("ip","local","date","ts","uid","site","operator")