大数据实践(十一)SparkSQL模块基础

SparkSQL是Spark的一个子模块,主要用于操作结构化数据,借鉴了Hive。

此前使用的是SparkCore模块的RDD结构进行数据处理,SparkSQL提供了结构化的数据结构DataFrame、DataSet。

SparkSQL支持SQL、DSL(domain-specific language)两种方式、多种语言(Scala、Java、Python、R)进行开发,最后底层都转换为RDD.

SparkSQL支持多种数据源(Hive,Avro,Parquet,ORC,JSON 和 JDBC 等)、支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许你访问现有的 Hive 仓库.

大数据实践(十一)SparkSQL模块基础

零、SparkSession、IDEA整合Spark

SparkSession:这是一个新入口,取代了原本的SQLContext与HiveContext。

SparkContext也可以通过SparkSession获得。

1、SparkSession

交互式环境下启动spark后,自带一个变量spark

Spark context available as ‘sc‘ (master = local[*], app id = local-1608185209816).
Spark session available as ‘spark‘.

#读取文件
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]  

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+

在文件程序中,使用builder方法获取,本身没有公开的相关构造器可以被使用。

val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()

通过SparkSession入口可以进行数据加载、保存、执行SQL、以及获得其他入口(sqlContext、SparkContext)

2、使用IDEA开发Spark

Spark使用本地开发,将其集成到IDEA中,首先需要有scalasdk,版本要和操作的spark适配。

其次IDEA需要已经安装过scala开发插件。

接下来在maven项目配置文件中导入以下依赖地址即可。

 	<!-- SparkCore,基础模块 -->
	<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>

        <!-- SparkSQL,SQL模块 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>


        <!-- Spark和hive集成的模块,可用于处理hive中的数据 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0-preview2</version>
            <scope>provided</scope>
        </dependency>
<!--mysql驱动,与Mysql交互需要-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>

如果日志太多碍眼,可以调高日志等级,第一行修改为ERROR,在resources目录下使用log4j.properties文件覆盖默认日志配置文件。

#hadoop.root.logger=warn,console
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger‘s log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR

一、RDD、DataFrame、DataSet

RDD作为SparkCore的数据结构,可以处理结构化非结构化的数据,但效率较低。

SparkSQL中封装了对结构化数据处理效果更好的DataFrame、DateSet.

RDD以行为单位,对行有数据类型。

DataFrame来自pandas库的数据结构,相比于RDD提供了字段类型,DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。

Dataset 也是分布式的数据集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame
的优点,具备强类型的特点,同时支持 Lambda 函数,但只能在 Scala 和 Java 语言中使用。

三者可以互相转换。

对文本文件返回了 Dataset[String]

scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+


#结构
scala> text.printSchema
root
 |-- value: string (nullable = true)
#返回一个new session
scala> val nsp = spark.newSession
nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b

#读取json默认返回DF
scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

#定义一个样例类,作为数据类型
scala> case class User(id :Long,username:String,email:String){}
defined class User

#加入数据类型,返回ds
scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]

内部结构

scala> userds.printSchema
root
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)

互相转换:

scala> val udf = user.rdd
udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25

scala> userds.toDF
res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

二、SQL、DSL

SparkSQL可以使用SQL编程,也可以基于特定语言,调用api进行编程。

1、SQL

使用SQL编程有两步:

? 1、创建临时视图。(有表或视图才能执行sql,spark可以创建临时视图)

? 2、编写sql语句。

#创建视图users
scala> userds.createOrReplaceTempView("users")

#编写语句
scala> nsp.sql("select id,username from users order by id desc").show
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

/*
createGlobalTempView            createOrReplaceTempView   crossJoin   
createOrReplaceGlobalTempView   createTempView 
*/

如果在文件程序中编写,需要导入以下隐式转换。

    import spark.implicits._ #spark是变量名
    import org.apache.spark.sql.functions._
2、DSL

DSL是特定语言,如scala、java等。

调用api进行处理。

#scala DSL编程
scala> userds.select(‘id,‘username).orderBy(desc("id")).show()
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

使用Java进行编程。

 final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();

        Dataset<Row> dataset = spark.read().json("spark/data/user3.json");

        try{
            dataset.createTempView("users");
			//SQL
            spark.sql("select id,username from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }
		
		//DSL
		dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
		
        spark.close();

三、UDF

UDF:Spark和Hive一样,允许用户自定义函数来补充功能。

SQL方式和DSL方式都支持。

1、SQL方式

#注册函数
scala> nsp.udf.register("plus0",(id:String)=>0+id)
res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)

scala> nsp.sql("select plus0(id),username from users order by id desc").show
+---------+--------+
|plus0(id)|username|
+---------+--------+
|       03|     Jim|
|       02|    jack|
|       01|    Bret|
+---------+--------+

#注册函数
scala> nsp.udf.register("plus1",(id:Int)=>1+id)
res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)

scala> nsp.sql("select plus1(id),username from users order by id desc").show
+---------+--------+
|plus1(id)|username|
+---------+--------+
|        4|     Jim|
|        3|    jack|
|        2|    Bret|
+---------+--------+


2、DSL方式

                       
#定义函数
scala> val prefix_name =udf(
     | (name:String)=>{ "user: "+name})

prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)

#套入字段名
scala> userds.select($"id",prefix_name(‘username).as("newname")).orderBy(desc("id")).show()
+---+----------+
| id|   newname|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

Java编写

	#接收参数名和一个函数接口
spark.udf().register("prefix_name", new UDF1<String, String>() {

           @Override
           public String call(String s) throws Exception {
               return "user: "+s;
           }
       }, DataTypes.StringType);


        try{
            dataset.createTempView("users");
	#和SQL一样使用
            spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }

+---+----------+
| id|  new_name|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

四、Spark on Hive

SparkSQL集成Hive,加载读取Hive表数据进行分析,称之为Spark on Hive;

Hive 框架底层分区引擎,可以将MapReduce改为Spark,称之为Hive on Spark;

Spark on Hive 相当于Spark使用Hive的数据。

Hive on Spark相当于Hive把自身的计算从MR换成SparkRDD.

可以通过以下方式进行Spark on Hive.

1、Sparksql

spark-sql是spark提供的一种交互模式,使用sql语句进行处理,默认就是使用hive中的数据。

需要将mysql驱动放到jar目录下,或者指定驱动。

spark-sql --master local[*]

#指定驱动
spark-sql  --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar

2、在IDEA中使用spark on hive

添加spark操作hive的依赖。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0-preview2</version>
    </dependency>

core-site.xml,hdfs-site.xml,hive-site.xml都拷贝到Rsource目录下.

可以使用Spark 操作Hive数据了。

 	val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf)
			 .enableHiveSupport()//!!!!默认不支持外部hive,这里需调用方法支持外部hive
			.getOrCreate()

    import spark.implicits._


    spark.sql("use spark_sql_hive") 


    spark.sql(
      """
        |select t.id,t.name,t.email from
        |(select cast(‘id‘ as INT) ,id ,name,email from user where id >1001) t
      """.stripMargin).show()

大数据实践(十一)SparkSQL模块基础

上一篇:Alembic管理Sqlite3数据库版本


下一篇:Spring boot整合JdbcTemplate