SparkSql(2)

7.分区,分桶,排序

分区是按某个字段以目录级别划分
分桶是按某个字段以文件级别划分
排序是按照某个字段在文件内部(每个桶)有序
注意:
如果使用分桶和排序,必须使用持久化表
如果数据的数量较小,有的达不到设置的分桶数

def bps(spark:SparkSession)={
      import spark.implicits._
      val bpsDF=spark.read.load("D://work/path")
      bpsDF.write
        .partitionBy("flag")
        .sortBy("sqrt")
        .bucketBy(5,"number")
        .option("path","D://work/path1")

        .saveAsTable("sqrtAndcube")
        
    }

8.合并schema

两个相同路径下的DF可以按照字段名做merge合并
如果想把所有列全部显示,读取数据的时候需要开启merge开关
option(“mergeSchema”,“true”)

def schemaMerge(spark:SparkSession)={
      //创建第一个RDD
      import spark.implicits._
      val sc=spark.sparkContext
      sc.parallelize(List(1,2,3,6,5))
        .map(n=>(n,n*n))
        .toDF("number","sqrt")
        .write
        .mode(SaveMode.Overwrite)
        .save("D://work/path/flag=a")
      //创建第二个RDD
      sc.parallelize(List(4,5,6,9,6))
        .map(n=>(n,n*n*n))
        .toDF("number","cube")
        .write
        .mode(SaveMode.Overwrite)
        .save("D://work/path/flag=b")

      val resDF=spark.read.option("mergeSchema","true")load("D://work/path")
      resDF.show()
    }

9.读取hive数据

sparkSQL可以读取hive仓库的数据
如果使用hive,必须在sparkSession中添加hive的支持
语法与hive的语法一致
warehouse和元数据库的三种形式
a.如果没有任何路径设置,在当前项目下创建数据仓库和derby元数据库
(如果你写了支持hive的sparkSession,就不要写在写一个其他的,会报错)

val spark_hive=SparkSession.builder().master("local[*]").
      appName("app2").
      enableHiveSupport().
      getOrCreate()
    hiveoptions(spark_hive)
    spark_hive.stop()

b.如果设置仓库路径,在当前项目下创建derby元数据库,
在指定路径下创建数据仓库
在主方法里加上.config(”spark.sql.warehouse.dir“,”path“)

val spark_hive=SparkSession.builder().master("local[*]").
      appName("app2").
      enableHiveSupport().
      config("spark.sql.warehouse.dir","D://work/path").
      getOrCreate()
    hiveoptions(spark_hive)

    spark_hive.stop()

c.如果有hive-site.xml文件,里面设置了相应元数据的连接信息,
可以读取指定元数据库和元数据库指向的数据仓库的数据

10.连接mysql数据库(JDBC)

读:
a.spark.read.format(“jdbc”).option(“url”,xxx)… .load()

b.spark.read.jdbc(url,dbtable,prop)
写:
a.df.write.format(“jdbc”).option(“url”,xxx)… .save()
b.df.write.jdbc(url,dbtable,prop)

 val reDF=spark.read.format("jdbc").
          option("url","jdbc:mysql://localhost:3306/hive")
          .option("dbtable","peoples")
          .option("user","root")
          .option("password","root")
          .load()

        val reDF1=reDF.createOrReplaceTempView("peo")
          val resDF=spark.sql("select * from peo where id in (1,2,3)")
          resDF.write.format("jdbc")
            .option("url","jdbc:mysql://localhost:3306/hive")
            .option("dbtable","people2")
            .option("user","root")
            .option("password","root")
            .mode(SaveMode.Overwrite)
            .save()


        //第二种方式读写mysql里的数
        val prop=new Properties()
        prop.setProperty("user","root")
        prop.setProperty("password","root")
        val reDF2=spark.read.jdbc("jdbc:mysql://localhost:3306/hive","peoples",prop)
        reDF2.createOrReplaceTempView("peo")
        val resDF1=spark.sql("select * from peo where id>3")
        resDF1.write.jdbc("jdbc:mysql://localhost:3306/hive","peoples1",prop)
上一篇:hyperf 视图 smarty 模板引擎


下一篇:wordpress 搬家 如何修改配置文件?实用教程建议收藏