7.分区,分桶,排序
分区是按某个字段以目录级别划分
分桶是按某个字段以文件级别划分
排序是按照某个字段在文件内部(每个桶)有序
注意:
如果使用分桶和排序,必须使用持久化表
如果数据的数量较小,有的达不到设置的分桶数
def bps(spark:SparkSession)={
import spark.implicits._
val bpsDF=spark.read.load("D://work/path")
bpsDF.write
.partitionBy("flag")
.sortBy("sqrt")
.bucketBy(5,"number")
.option("path","D://work/path1")
.saveAsTable("sqrtAndcube")
}
8.合并schema
两个相同路径下的DF可以按照字段名做merge合并
如果想把所有列全部显示,读取数据的时候需要开启merge开关
option(“mergeSchema”,“true”)
def schemaMerge(spark:SparkSession)={
//创建第一个RDD
import spark.implicits._
val sc=spark.sparkContext
sc.parallelize(List(1,2,3,6,5))
.map(n=>(n,n*n))
.toDF("number","sqrt")
.write
.mode(SaveMode.Overwrite)
.save("D://work/path/flag=a")
//创建第二个RDD
sc.parallelize(List(4,5,6,9,6))
.map(n=>(n,n*n*n))
.toDF("number","cube")
.write
.mode(SaveMode.Overwrite)
.save("D://work/path/flag=b")
val resDF=spark.read.option("mergeSchema","true")load("D://work/path")
resDF.show()
}
9.读取hive数据
sparkSQL可以读取hive仓库的数据
如果使用hive,必须在sparkSession中添加hive的支持
语法与hive的语法一致
warehouse和元数据库的三种形式
a.如果没有任何路径设置,在当前项目下创建数据仓库和derby元数据库
(如果你写了支持hive的sparkSession,就不要写在写一个其他的,会报错)
val spark_hive=SparkSession.builder().master("local[*]").
appName("app2").
enableHiveSupport().
getOrCreate()
hiveoptions(spark_hive)
spark_hive.stop()
b.如果设置仓库路径,在当前项目下创建derby元数据库,
在指定路径下创建数据仓库
在主方法里加上.config(”spark.sql.warehouse.dir“,”path“)
val spark_hive=SparkSession.builder().master("local[*]").
appName("app2").
enableHiveSupport().
config("spark.sql.warehouse.dir","D://work/path").
getOrCreate()
hiveoptions(spark_hive)
spark_hive.stop()
c.如果有hive-site.xml文件,里面设置了相应元数据的连接信息,
可以读取指定元数据库和元数据库指向的数据仓库的数据
10.连接mysql数据库(JDBC)
读:
a.spark.read.format(“jdbc”).option(“url”,xxx)… .load()
b.spark.read.jdbc(url,dbtable,prop)
写:
a.df.write.format(“jdbc”).option(“url”,xxx)… .save()
b.df.write.jdbc(url,dbtable,prop)
val reDF=spark.read.format("jdbc").
option("url","jdbc:mysql://localhost:3306/hive")
.option("dbtable","peoples")
.option("user","root")
.option("password","root")
.load()
val reDF1=reDF.createOrReplaceTempView("peo")
val resDF=spark.sql("select * from peo where id in (1,2,3)")
resDF.write.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/hive")
.option("dbtable","people2")
.option("user","root")
.option("password","root")
.mode(SaveMode.Overwrite)
.save()
//第二种方式读写mysql里的数
val prop=new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
val reDF2=spark.read.jdbc("jdbc:mysql://localhost:3306/hive","peoples",prop)
reDF2.createOrReplaceTempView("peo")
val resDF1=spark.sql("select * from peo where id>3")
resDF1.write.jdbc("jdbc:mysql://localhost:3306/hive","peoples1",prop)