Spark+sparkSql

Spark

MapReduce的槽点
1.一个简单的WC程序,需要很多的java代码
1)自定义Mapper
2)自定义Reducer
3)通过Driver把Mapper和Reducer串起来
4)打包,上传到集群
5)在集群上提交WC程序

一句话:就是会花费非常多的时间在非业务逻辑改动的工作上

2. MapTask和ReduceTask都是进程级别
第一个MR的输出要先落地,然后第二个MR把第一个MR的输出当做输入
中间过程的数据需要落地,磁盘,网络开销很大,包括数据的序列化与反序列化

SparK特性:
1)Speed
both batch and streaming data 批流一体
快,主要体现在哪些角度?
2)Ease of Use
SQL,Scala,JAVA,Python,R
high-level operators
3) Generality
stack 栈 生态
4)Runs Everywhere
local,YARN,k8s...可以访问很多类型的数据

Spark Stack
Spark SQL + Spark Streaming + MLlib + GraphX

Spark运行模式
local:本地运行,在开发代码的时候,使用该模式进行测试
standalone:Hadoop部署多个节点,同理Spark可以部署多个节点,用得不多
YARN:将Spark作业提交到YARN上运行,Spark仅仅是一个客户端
Mesos:用的很少
K8s:2.3版本才正式稍微稳定,是未来比较好的方向

******运行模式和代码没有任何关系

rdd.collect().foreach(print)
本地模式下启动spark shell: ./bin/spark-shell --master local
用spark-submit 来提交jar包到local 或者Yarn模式上并运行

Spark对比于Hadoop

Spark SQL是什么
Spark SQL is Apache Spark`s moduls for working with structed data.

特性:
1) 集成性:在Spark编程中无缝对接多种复杂的SQL
2) 统一的访问方式:以类似的方式访问多种不同的数据源,而且可以进行相关操作
spark.read.format("json/text/parquet/...").load(path)
3) 兼容Hive
allowing you to access existing hive warehouse
如果你想把hive的作业迁移到Spark SQL,可以直接访问,这样的话,迁移成本会低很多
4)标准的数据连接:提供标准的JDBC/ODBC连接方式 Server

误区1. Spark SQL就是一个SQL处理框架
Spark SQL应用并不局限于SQL,还支持Hive,JSON,Parquet文件的直接读取以及操作
SQL仅仅只是Spark SQL中的一个功能而已

为什么要学习Spark SQL
sql带来的便利性
Spark Core: RDD
Spark SQL: 底层的Catalyst 为我们自动做了很多的优化工作
SQL(只要了解业务逻辑,然后使用SQL来实现)
DF/DS:面向API编程的,使用一些JAVA,Scala

Spark Sql架构
Fribtend
Hive AST SQL语句(字符串)==> 抽象语法树
Spark Program : DF/DS API
Streaming SQL
Catalyst
Unresolved LogicPlan
select empno,ename, from emp
Schema Catalog
和MetaStore作对比
LogicPlan
Optimized LogicPlan
优化后的sql语句:将我们的SQl作用上很多内置的规则,使得我们拿到的逻辑执行计划是比较好的
Physical Plan 物理执行计划
Backend


spark-shell的使用
每个Spark应用程序(spark-shell)在不同目录下启动,其实在该目录下是有metastore_db
是单独的
如果你想spark-shell共享我们的元数据的话,肯定要指定元数据信息

spark.sql(“show tables”).show
(其中的spark是默认生成的SparkSession)

spark-shell底层调用的是spark-submit
spark-submit底层调用的是spark-class

spark-sql底层调用的也是spark-submit
只是二者调用的spark-submit的类不一样

./spark-submit \
--class org.example.test.TestApp \
--master yarn \
--name TestApp \
/usr/local/src/spark-2.4.4-bin-hadoop2.7/lib/SparkSql-1.0-SNAPSHOT.jar \
hdfs://master:9000/data/testData hdfs://master:9000/data/out

SparkSession:Spark编程的入口点(spark2.x后引入的)
SparkSession.builder().master("local").getOrCreate()

在对DataFrame和Dataset进行很多操作时都需要这个包进行支持
import spark.implicits._
//这里的spark是SparkSession的变量名

DataFrame是什么
A DataSet is a distributed collertion of data
A DataFrame is a DataSet organized into named columns
以列(列名,列类型,列值)的形式构成的分布式数据集
id int 10
name string pk
df类似于数据库中的一张表,可以直接用SQL来进行查询,
df.printSchema()来查看表结构
DF=DS(row)

DataFrame API操作
df.select("name").show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
df.orderBy(desc("pop")).show()按照降序排列
//先将DF注册成一张表,接着就可以使用sql语句进行操作
df.createOrReplaceTempView("people")
spark.sql("select name from people where age > 21").show()

Dataset
A Dataset is a distributed collection of data
创建一个DS: val people:DataSet[Person] = spark.read.json("...").as[Person]
val primitiveDS:DataSet[Int]=Seq(1,2,3).toDS()

DataFrame vs Dataset vs RDD

DataFrame = Dataset[Row]
Dataset是一种强类型 typed类型
example:
df.select("name").show()
ds.map(_.name).show()
RDD一般和spark mlib同时使用
RDD不支持sparksql操作

DF/DS ==> RDD
val rdd1=testDF.rdd
val rdd2=testDS.rdd

RDD ==>DF
import spark.implicits._
val testDF = rdd.map(_.split(","))
.map {x => Person(x(0), x(1).trim.toInt)}.toDF()

RDD ==>DS
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

Dataset ==> DataFrame
import spark.implicits._
val testDF = testDS.toDF

DataFrame ==> Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

Array ==> RDD
val userRDD:RDD[String] = spark.sparkContext.parallelize(logArray)

在日常开发过程中,我们使用Spark SQL来进行日志处理(90%)

*****
你要处理一个目录下或者指定文件的日志数据,假设数据格式是文本类型的
若直接使用spark.read.text(path)读进来之后,结果只有一个string 类型的名字为value的值,很不方便处理
则此时一般使用spark.sparkContext.textFile(path)来读取text文本数据
读取后为RDD格式,再将RDD转换为DF/DS,再进行相应的操作

1) uses reflection to infer the schema of an RDD that contains specific types of objects.
反射
2) through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

对于字段比较少的场景,个人推荐第一种
import spark.implicits._
case class People(name: String, age: Int)
val peopleRDD: RDD[String] = spark.sparkContext.textFile("file:///Users/ww/Desktop/SparkSql/data/people.txt")
val peopleDF: DataFrame = peopleRDD.map(_.split(",")) //RDD
.map(x => People(x(0), x(1).trim.toInt))
.toDF()
...
对于字段比较多的场景,个人推荐第二种,自己灵活定制
//step1
val peopleRDD: RDD[String] = spark.sparkContext.textFile("file:///Users/ww/Desktop/SparkSql/data/people.txt")
val peopleRowRDD: RDD[Row] = peopleRDD.map(_.split(",")) //RDD
.map(x => Row(x(0), x(1).trim.toInt))

//step2
val struct =
StructType(
StructField("name", StringType, true) ::
StructField("age", IntegerType, false) :: Nil)

//step3
val peopleDF: DataFrame = spark.createDataFrame(peopleRowRDD, struct)
...


DataSources *****
loading and saving data
文本文件 ,json,parquet,mysql,hive

spark.read.format("text/json/parquet").load(path)
spark.write.format("text/json/parquet").mode("overwrite/append").save(path)

***** mysql读取 *****

在pom文件中添加mysql驱动依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
第一种方式:直接读取
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "chen")

val jdbcDF = spark.read
.jdbc("jdbc:mysql://master:3306", "chen.tablename", connectionProperties)

jdbcDF.write.jdbc("jdbc:mysql://master:3306","chen.tablename",connectionProperties)
第二种方式:从配置文件加载,方便修改(推荐)
1.添加依赖
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>
2.在resources中新建application.conf文件
db.default.driver="com.mysql.cj.jdbc.Driver"
db.default.url="jdbc:mysql://192.168.187.10:3306"
db.default.user=root
db.default.password=chen
db.default.database=chen
db.default.table=wordcount
db.default.sink.table=wordcount3
3.代码中加入
val config = ConfigFactory.load()
val url = config.getString("db.default.url")
val user = config.getString("db.default.user")
val password = config.getString("db.default.password")
val driver = config.getString("db.default.driver")
val database = config.getString("db.default.database")
val table = config.getString("db.default.table")
val sink = config.getString("db.default.sink.table")

val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
val jdbcDF3 = spark.read
.jdbc(url, s"$database.$table", connectionProperties)

jdbcDF3.filter($"wordcount" > 1).write.jdbc(url,s"$database.$sink",connectionProperties)

Spark Sql 如何对接Hive

Hive的底层元数据信息是存储在MySql中, $HIVIE_HOME/conf/hive-site.xml
Spark如果能够直接访问到mysql中的元数据信息 $SPARK_HOME/conf/hive-site.xml
然后使用spark-shell启动就可以了

Server-client *****

我们在服务器上启动一个Spark应用程序你的Server 7*24小时运行
客户端可以连接到Server上去干活

启动thriftServer
$SPARK_HOME/sbin
./start-thriftserver.sh --master local

连接方式1
启动客户端工具 $SPARK_HOME/bin/beeline
./beeline -u jdbc:hive2://master:10000
连接后即可使用sql操作
并且可以各个节点各自启动一个beeline

连接方式2
使用代码来连接ThriftServer

使用Spark代码来访问Hive的数据
val spark = SparkSession
.builder()
.appName("Example")
.master("local")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

spark.table("chen.test").show()

将数据写入到hive表中
方式一:df.write.insertInto("")
方式二:df.write.mode("overwrite").saveAsTable("")

Spark SQL函数
内置函数
import org.apache.spark.sql.functions._

pv: df.groupBy("day").agg(count("userId").as("pv")).show()
uv: df.groupBy("day").agg( countDistinct("userId").as("uv")).show()
自定义函数(UDF)
三部曲
1)定义函数
2)注册函数
3)使用函数

//将方法注册为“hobby_num”
spark.udf.register("hobby_num",(s:String) => s.split(",").size)


基于Spark和Kudu的广告业务项目实战

广告业务背景
传统广告
互联网广告

项目架构
Flume采集各个服务器上的日志信息,保存为json格式,存储在hdfs上
通过spark对该日志数据进行ETL后,分析出IP地址对应的省市区以及其他信息,并将结果存入Kudo中的ODS表中(按天存储)
再根据需求从ODS表中读取数据进行相应的统计
统计结果依旧存入新的ODS


Spark调优策略

1)合理的资源设置
--executor-memory MEM 1G 每个executor的内存大小
--executor-cores NUM 1 每个executor占用的cpu core数量
--num-executors 2 executor的数量
--queue root.用户名 运行的队列

提交时:
${SPRAK_HOME}/bin/spark-submit --class xxxxx \
master yarn \
--deploy-mode cluster \
--executor-cores ? \
--num-executors ? \
--executor-memory ? \
application.jar xxx yyy zzz

2)广播变量的使用
每个节点一个copy副本,而不是每个task一个、
是把小表的数据通过sc广播出去

spark.sparkContext.broadcast(peopleInfo)
peopleInfo是一个map数据

mapPartitions做的事情:遍历大表的每一行数据,和广播变量的数据进行对比,有就取出来,没有就拉倒

3)Shuffle调优
在spark-submit时通过 --conf 方式,将下面的参数一一设置进去即可

map端的缓冲区大小:
spark.shuffle.file.buffer
如果过小 ==> 数据频繁的写入磁盘文件,增加了IO开销、
reduce端拉取数据缓冲区大小:
spark.reducer.maxSizeInFlight
reduce端拉取数据重试次数:
spark.shuffle.io.maxRetries
reduce端拉取数据等待间隔:
spark.shuffle.io.retryWait

4)JVM GC相关调优 (垃圾回收机制)
1、若file not found/ file lost /timeout
调节连接等待时长: 默认120s
spark.core.connection.ack.wait.timeout

2、若executor lost,oom,shuffle output file not found
调节executor堆外内存
spark.yarn.executor.memoryOverhead 1-2G之间

5)其他调优
详细查看官网

分布式大数据SQL查询引擎Presto
Presto是什么:
distributed SQL query engine
分布式SQL查询引擎:交互式分析查询,数据量支持GB到PB
解决商业数据仓库的交互式分析和速度的问题
Presto能做什么
querying data where it lives
Hive, Cassandra, relational databases,proprietary data stores
Hive,Cassandra,关系型数据库以及特有数据存储

==> 使用Presto来进行跨引擎查询: 比如实现hive join mysql

Presto是一个可以运行在多个服务器组成的分布式集群之上
包括一个coordinator和多个worker
client 发起命令到 coordinator
coordinator 进行解析,分析,执行计划
然后分发到worker上去执行
能够整合很多其他的大数据开源框架进行使用,是通过Connectors机制来完成

大数据平台建设的思考

1)项目与平台的差异化
Hadoop,hive,spark,flink,storm,scala其实都是大数据开发必备的技术、框架
项目:以功能为主 中小公司/大公司
平台:大公司 提供给用户通用、定制化的解决方案
数据采集,离线计算,实时计算,机器学习,图计算。。。
往往采用简单的Sql和WebUi即可实现功能

2)大数据平台提供的能力
为什么要构建大数据云平台
1)统一管控
数据分散,异构 ==> 信息孤岛
数据的存储,资源 ==> 统一的资源管控 造成资源浪费
2)能力
运维,支撑程度
性能,规模瓶颈

==>
数据湖:数据和资源的共享
云能力:集群,扩容,快速开发

3)大数据平台功能
图示

4)数据湖架构
图示

5)数据存储和计算
存储和计算:大数据的思想 ==>hadoop
公有云
多租户 多用户
==> 数据安全
==> 不同的租户或者用户登录到系统之后,能访问的数据的权限是不一样的
==> Table :行 列
==> 数据权限控制到行和列级别
==> 使用的资源也是不一样的 queue
私有云

有多个集群:热 冷 备份
数据迁移 自动,压缩,小文件合并
当数据迁移时(热->冷),计算能力是否能够无缝对接

隔离:队列隔离,资源隔离
6)资源

资源分配如何最优,资源使用率最大化
Spark/Flink
spark-submit 多少个executor,每个executor多少Memory,每个executor多少core?

调度框架:AZ,Oozie,crontab
一个作业涉及多个Job:workflow
多个job之间如何做资源隔离

Spark/Flink on YARN: 申请资源是需要一定时间的,调优使得申请资源的时间减少

7)兼容性
假设数据平台是基于CDH/HDP来构建的
但是:用户现在已有的数据是基于阿里,华为云来开发和维护的
务必要提前做一件事情:熟悉已有的数据平台的一些功能 vs 我们数据平台的功能

8)执行引擎和运行方式
引擎的适配
离线:Spark Flink
做一个功能:按照我们的产品设计文档,写一份代码,可以同时运行在不用的引擎上
Apache Beam

运行方式的适配
YARN,K8s

9)Spark和Flink的选择性
Spark到现在社区比较成熟,提供的功能也是比较完善的
FlinK实时部分真的不错

****活到老学到老****

学习肯定是以案例进行展开 源码 测试用例

大数据全栈

很多人学习看不起wc:生产上很多的东西都是wc的变种

 

上一篇:个人实验2


下一篇:sparkSql实战案例