spark streaming (二)

一、基础核心概念

1、StreamingContext详解 (一)

有两种创建StreamingContext的方式:
             val conf = new SparkConf().setAppName(appName).setMaster(master);
             val ssc = new StreamingContext(conf, Seconds(1));

StreamingContext, 还可以使用已有的SparkContext来创建
              val sc = new SparkContext(conf)
              val ssc = new StreamingContext(sc, Seconds(1));

appName, 是用来在Spark UI上显示的应用名称。 master, 是一个Spark、 Mesos或者Yarn集群的URL, 或者是local[*]。

2、StreamingContext详解 (二)

一个StreamingContext定义之后, 必须做以下几件事情:
                  1、 通过创建输入DStream来创建输入数据源。
                  2、 通过对DStream定义transformation和output算子操作, 来定义实时计算逻辑。
                  3 、 调用StreamingContext的start()方法, 来开始实时处理数据。
                  4、 调用StreamingContext的awaitTermination()方法, 来等待应用程序的终止。 可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
                  5、 也可以通过调用StreamingContext的stop()方法, 来停止应用程序。

需要注意的要点:
                  1、 只要一个StreamingContext启动之后, 就不能再往其中添加任何计算逻辑了。 比如执行start()方法之后, 还给某个DStream执行一个算子。
                  2、 一个StreamingContext停止之后, 是肯定不能够重启的。 调用stop()之后, 不能再调用start()
                  3、 一个JVM同时只能有一个StreamingContext启动。 在你的应用程序中, 不能创建两个StreamingContext。
                  4、 调用stop()方法时, 会同时停止内部的SparkContext, 如果不希望如此, 还希望后面继续使用SparkContext创建其他类型的Context, 比如SQLContext, 那么就用stop(false)。
                  5、 一个SparkContext可以创建多个StreamingContext, 只要上一个先用stop(false)停止, 再创建下一个即可。

3、输入DStream和Receiver详解(一)

输入DStream代表了来自数据源的输入数据流。 在之前的wordcount例子中, lines就是一个输入 DStream( JavaReceiverInputDStream) , 代表了从netcat( nc) 服务接收到的数据流。 除了 文件数据流之外, 所有的输入DStream都会绑定一个Receiver对象,

该对象是一个关键的组件, 用来从数据源接收数据, 并将其存储在Spark的内存中, 以供后续处理。 
                 Spark Streaming提供了三种内置的数据源支持;
                              1、 基础数据源: StreamingContext API中直接提供了对这些数据源的支持, 比如文件、 socket、 Akka Actor等。
                              2、 高级数据源: 诸如Kafka、 Flume、 Kinesis、 Twitter等数据源, 通过第三方工具类提供支持。 这些数据源的使用, 需要引用其依赖。
                              3、 自定义数据源: 我们可以自己定义数据源, 来决定如何接受和存储数据。

4、输入DStream和Receiver详解(二)

如果你想要在实时计算应用中并行接收多条数据流, 可以创建多个输入DStream。 这样就会创建多个 Receiver, 从而并行地接收多个数据流。 但是要注意的是, 一个Spark Streaming Application的 Executor, 是一个长时间运行的任务, 因此,

它会独占分配给Spark Streaming Application的cpu core。从而只要Spark Streaming运行起来以后, 这个节点上的cpu core, 就没法给其他应用使用了。

使用本地模式, 运行程序时, 绝对不能用local或者local[1], 因为那样的话, 只会给执行输入DStream的 executor分配一个线程。 而Spark Streaming底层的原理是, 至少要有两条线程, 一条线程用来分配给 Receiver接收数据, 一条线程用来处理接收到的数据。

因此必须使用local[n], n>=2的模式。     (n不能大于当前节点的CPU核数)
                    如果不设置Master, 也就是直接将Spark Streaming应用提交到集群上运行, 那么首先, 必须要求集群 节点上, 有>1个cpu core, 其次, 给Spark Streaming的每个executor分配的core, 必须>1, 这样, 才能保证分配到executor上运行的输入DStream,

两条线程并行, 一条运行Receiver, 接收数据; 一条处 理数据。 否则的话, 只会接收数据, 不会处理数据。

总结:Receiver接收器

Receiver接收器,可以接收外部数据源中的数据,并将其保存到
内存中,以供后续使用。
在Spark中支持三大类型的数据源:
1、基础数据源:比如文件、Socket、Akka中的数据。
2、高级数据源,比如Flume、Kafka、推特中的数据。
3、自定义数据源。
补充:在Spark Streaming中,可以通过两种方式操作Kafka的数据。
一种是通过Receiver的方式,另一种Direct直接读取的方式。

5、输入DStream之基础数据源
               (1)Socket: 之前的wordcount例子,

object WordCoundStreaming {
def main(args: Array[String]): Unit = {
/**
* 处理Spakr Streaming程序至少需要2个线程,其中,
* 一个线程负责接收输入的数据
* 另一个线程负责处理接收的数据
* local[N] N>=2
*/
val conf=new SparkConf().setAppName("WordCoundStreaming")
.setMaster("local[2]")
/**
* SparkContext是用户与Spark集群交互的唯一接口,所以SparkContext是必需的。
* 在创建StreamingContext的过程中,Spark会在源码中自动创建一个SparkContext对象。
* 注意第二个参数Seconds(*),表示实时流数据中每批数据的时间间隔,
* 也就是说,在DStream离散流中的每个RDD包含相应时间间隔的数据。
*/
val ssc=new StreamingContext(conf,Seconds(5))
/**
* 通过socketTextStream()获取nc服务器中的数据
* 需要指明获取nc服务器的节点名称和端口,这是的端口要和运行nc服务器的端口一致。
* 此时产生的lines不是RDD,而是一个DStream离散流。
*/
val lines=ssc.socketTextStream("tgmaster",9999)
/**
* 对离散流lines进行flatMap转换操作,实际上是对lines离散流中的每个RDD都进行
* flatMap操作,从而产生了新的RDD,多个新的RDD构成了新的离散流words。
*/
val words=lines.flatMap(_.split(" "))
val pairs=words.map((_,1))
val result=pairs.reduceByKey(_+_) //在控制台输出内容
result.print()
//启动StreamingContext
ssc.start()
//等待程序停止
ssc.awaitTermination()
}
}

    (2)基于HDFS文件的实时计算, 其实就是, 监控一个HDFS目录, 只要其中有新文件出现, 就实时处理。相当于处理实时的文件流。 
               streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
               streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
               Spark Streaming会监视指定的HDFS目录, 并且处理出现在目录中的文件。
                要注意的是, 所有放入HDFS目录中的文件, 都必须有相同的格式; 必须使用移动或者重命名的方式,将文件移入目录; 一旦处理之后, 文件的内容即使改变, 也不会再处理了; 基于HDFS文件的数据源是没有Receiver的, 因此不会占用一个cpu core。

 def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("word").setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
val lines=ssc.textFileStream("hdfs://liuwei1:9000/homework")
val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}

二、DStream的transformation

spark streaming (二)

spark streaming (二)

总结:DStream中Transformation类型的算子有以下三个最重要
1)updateStateByKey
可以为每个Key保留一份状态,并更新状态的值。
案例:在全局范围内统计每个单词出现的次数。
2)transform
可以执行RDD到RDD的操作,相当于对DStream API的一个补充。
3)window
滑动窗口操作,需要指明两个参数,一个是窗口的长度,另一个是
窗口滑动的时间间隔。

三、updateStateBykey  (全局范围之内处理数据,而不是一批一批的)
      updateStateByKey
    updateStateByKey操作, 可以让我们为每个key维护一份state, 并持续不断的更新该state。
          1、 首先, 要定义一个state, 可以是任意的数据类型;
          2、 其次, 要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。
    对于每个batch, Spark都会为每个之前已经存在的key去应用一次state更新函数, 无论这个key在batch中是否有新的数据。 如果state更新函数返回none, 那么key
对应的state就会被删除。 当然, 对于每个新出现的key, 也会执行state更新函数。
注意, updateStateByKey操作, 要求必须开启Checkpoint机制。
案例: 基于缓存的实时wordcount程序( 在实际业务场景中, 这个是非常有用的)

object updateStateByKey {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("updateStateByKey")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
//updateStateByKey操作,要求必须开启Checkpoint机制。
ssc.checkpoint("hdfs://tgmaster:9000/in/ch") val lines=ssc.socketTextStream("tgmaster",9999)
val pairs=lines.flatMap(_.split(" "))
.map((_,1))
/**
* values:多个新值的集合
* state:是一个Option类型的状态,
* 可以通过state.getOrElse(0)为newValue设置初始值
*/
val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
//创建newValue并设初始值0
var newValue=state.getOrElse(0)
/**
* 遍历values集合的新值,用以改变原先的旧值
*/
for(value <- values){
newValue+=value
}
/**
* updateStateByKey算子需要一个Option类型的返回值,
* 所以将更新之后的新值作为Option返回。
*/
Option(newValue)
}) result.print()
ssc.start()
ssc.awaitTermination()
}
}

四、transform

transform操作, 应用在DStream上时, 可以用于执行任意的RDD到RDD的转换操作。 它可以用于实现, DStream API中所没有提供的操作。 比如说, DStream API中, 并没有提 供将一个DStream中的每个batch, 与一个特定的RDD进行join的操作。

但是我们自己就 可以使用transform操作来实现该功能。
     DStream.join(), 只能join其他DStream。 在DStream每个batch的RDD计算出来之后, 会 去跟其他DStream的RDD进行join。
案例: 广告计费日志实时黑名单过滤

/**
* Created by tg on 3/29/17.
* 实时黑名单过滤
*/
object transformDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("transformDemo")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5)) //模拟数据,创建黑名单RDD,(String,Boolean) (name,Boolean)
val blackRDD=ssc.sparkContext.parallelize(Array(("tom",true)))
//从nc服务器中获取数据,nc服务器中的数据格式:time name,比如: 1101 jack
val linesDStream=ssc.socketTextStream("tgmaster",9999)
val mapDStream=linesDStream.map(line=>{
val log=line.split(" ")
(log(1),line) //(name,time name)
})
/**
* tansform()算子可以执行RDD到RDD的转换操作
*/
mapDStream.transform(adsRDD=>{
//让adsRDD与blackRDD进行leftOuterJoin左外连接操作,操作之后的数据包含所有的用户
// (String,(String,Boolean))
val joinRDD=adsRDD.leftOuterJoin(blackRDD)
val filterRDD=joinRDD.filter(x=>{
/**
* 当if条件成立时,意味着是黑名单人员,返回false将其删除。
* 当if条件不成立时,意味着不是黑名单人员,返回true将其保留。
*/
if(x._2._2.getOrElse(false)) false else true
})
filterRDD.map(x=>{
x._2._1
})
}).print() ssc.start()
ssc.awaitTermination()
}
}

五、window滑动窗口

Spark Streaming提供了滑动窗口操作的支持, 从而让我们可以对一个滑动窗口内的数据执行计算 操作。
     每次掉落在窗口内的RDD的数据, 会被聚合起来执行计算操作, 然后生成的RDD, 会作为window DStream的一个RDD。 比如下图中, 就是对每三秒钟的数据执行一次滑动窗口计算, 这3秒内的3个 RDD会被聚合起来进行处理, 然后过了两秒钟, 又会对最近三秒内的数据执行滑动窗口计算。 所以 每个滑动窗口操作, 都必须指定两个参数, 窗口长度以及滑动间隔, 而且这两个参数值都必须是 batch间隔的整数倍。 ( Spark Streaming对滑动窗口的支持, 是比Storm更加完善和强大的) 
spark streaming (二)

window滑动窗口操作
spark streaming (二)

案例:
      热点搜索词滑动统计, 每隔5秒钟, 统计最近20秒钟的搜索词的搜索频次, 并打印 出排名最靠前的3个搜索词以及出现次数

package SparkCore.day1

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext} /**
* Created by tg on 3/29/17.
* 热点搜索词滑动统计,每隔5秒钟,统计最近20秒钟的搜索词的搜索频次,
* 并打印出排名最靠前的3个搜索词以及出现次数。
*/
object reduceByKeyAndWindowDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("reduceByKeyAndWindowDemo")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(1))
//从nc服务器中获取数据,数据格式:name keyword,比如:张三 Spark
val linesDStream=ssc.socketTextStream("tgmaster",9999)
//取出每行的搜索关键词
val wordsDStream=linesDStream.map(x=>x.split(" ")(1))
val pairsDStream=wordsDStream.map(x=>(x,1))
/**
* 通过滑动窗口进行统计,每隔5秒钟,统计最近20秒钟的搜索词出现次数。
* reduceByKeyAndWindow()算子中,
* 第一部分v1+v2用于计算搜索词出现次数
* 第二部分Seconds(20)用于设置窗口的长度
* 第三部分Seconds(5)用于设置窗口的时间间隔
*/
val resultDStream=pairsDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>
v1+v2,Seconds(20),Seconds(5)) resultDStream.transform(itemRDD=>{
val result=itemRDD.map(x=>(x._2,x._1))
.sortByKey(false).take(3)
.map(x=>(x._2,x._1)) val resultRDD=ssc.sparkContext.parallelize(result)
resultRDD
}).print() ssc.start()
ssc.awaitTermination()
}
}

  

六、output操作及foreachRDD

1、output 操作

spark streaming (二)

DStream中的所有计算, 都是由output操作触发的, 比如print()。 如果没有任何 output操作, 那么, 压根儿就不会执行定义的计算逻辑。

此外, 即使你使用了foreachRDD output操作, 也必须在里面对RDD执行action操 作, 才能触发对每一个batch的计算逻辑。 否则, 光有foreachRDD output操作,在里面没有对RDD执行action操作, 也不会触发任何逻辑。

2、foreachRDD详解

通常在foreachRDD中, 都会创建一个Connection, 比如JDBC Connection, 然后通过 Connection将数据写入外部存储

误区一: 在RDD的foreach操作外部, 创建Connection 这种方式是错误的, 因为它会导致Connection对象被序列化后传输到每个Task中。 而这 种Connection对象, 实际上一般是不支持序列化的, 也就无法被传输 
                  dstream.foreachRDD { rdd =>
                   val connection = createNewConnection()
                  rdd.foreach { record => connection.send(record)
                }
                }

误区二: 在RDD的foreach操作内部, 创建Connection 这种方式是可以的, 但是效率低下。 因为它会导致对于RDD中的每一条数据, 都 创建一个Connection对象。 而通常来说, Connection的创建, 是很消耗性能的。 
                   dstream.foreachRDD { rdd =>
                    rdd.foreach { record =>
                   val connection = createNewConnection()
                   connection.send(record)
                   connection.close()
                   }
                  }

合理方式一: 使用RDD的foreachPartition操作, 并且在该操作内部, 创建Connection对象, 这样就相当于是, 为RDD的每个partition创建一个 Connection对象, 节省资源的多了。 
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

            合理方式二: 自己手动封装一个静态连接池, 使用RDD的foreachPartition操作, 并且在 该操作内部, 从静态连接池中, 通过静态方法, 获取到一个连接, 使用之后再还回去。 这样的话, 甚至在多个RDD的partition之间, 也可以复用连接了。 而且可以让连接池采取
懒创建的策略, 并且空闲一段时间后, 将其释放掉。

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}

案例:
     改写UpdateStateByKeyWordCount, 将每次统计出来的全局的单词计数, 写入一份, 到MySQL数据库中。

补充:

一、mysql数据库中varchar与char的区别

varchar 可变的 ;name varchar(100)   赋值时赋了10 ,不占用多余的空间,节省资源空间

char 固定的,name varchar(100) 赋值为10,占用多余空间,浪费资源空间

char在查询检索数据时,char类型的都是固定长度,所以直接查询检索数据,但是对于varchar类型的存储长度空间不一致,所以在查询检索数据之前,需要确定数据的存储长度。然后再查询数据,这样就比char类型的多了一层确定数据存储长度的操作,因此的那个数据量非常庞大时,会

影响查询数据效率

建表:
create table wordcount (
id integer auto_increment primary key,
updated_time timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
word varchar(255),
count integer
);
添加数据:
insert into 表名(列1,列2,....) values (值1,值2,....)

上一篇:mybatis Unable to load authentication plugin 'caching_sha2_password'.] with root cause


下一篇:Spark练习之通过Spark Streaming实时计算wordcount程序