SparkStreaming的DStream转换操作
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
一、状态概述
状态: state,指保存数据的对象
- 有状态的操作:当前算子的运算需要依赖于上一次运算的结果
-
特征:需要保存每一个批次的运算结果!以便下一个批次使用
- 无状态的操作:当前算子的运算,只需要当前批次的数据,不需要历史批次的状态
有状态和无状态操作的区分:
- 区分:
- ①所有有状态的操作,在计算时都需要保存上一次运算结果的状态,在spark中使用ck来保存!
-
如果一个算子,必须要调用ck保存一些数据,他就算是有状态的运算!
- ②有状态的操作,在命名上都会含有state关键字
-
XXXWithState
*/
二、无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
1.1 Transform
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
//transform: 将DS[T]转换为DS[U]
@Test
def testTeansform():Unit={
val sc: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
val ds2: DStream[(String, Int)] = sc.transform(rdd=>rdd.map(x=>(x,1)))
ds2.print(100)
}
1.2 join
两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。
注意点:
- 在一个spark streaming的应用中,通常只能new 一个streamingcontext,所有的操作都是基于这个streamingcontext进行流的操作!!!
- 所有的操作流都来子同一个streamingcontext,因此流的采集周期是一致的!!!
- 只有DS[K,V]类型的才能调用join,在join时必须根据k对相同k的v进行join
@Test
def testJoin():Unit={
val ds1: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
val ds2: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
//只有DS[K,V]类型的才能调用join,在join时必须根据k对相同k的v进行join
val ds3: DStream[(String, Int)] = ds1.map(x=>(x,1))
val ds4: DStream[(String, Int)] = ds2.map(x=>(x,1))
ds3.join(ds4).print(100)
二、有状态转化操作
2.1 UpdateStateByKey
UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的wordcount
1)编写代码
object WorldCount {
/**upstateByKey: 有状态的计算!
* 完成一些流中的一些累积操作! 基于key进行状态的合并计算!
* 上一批数据的运算结果必须是k-v形式,称为state!
* 当前数据也是k-v形式,将当前批数据和上一批数据中相同key的数据进行聚合,计算新的结果(state)!
*
*
* def updateStateByKey[S: ClassTag](
* updateFunc: (Seq[V], Option[S]) => Option[S] //更新state的函数
* Seq[V]:当前批次中,相同key的所有value的集合
* Option[S]: 当前批次的key,在上一个批次中的state
* 返回的还是状态
* ): DStream[(K, S)] = ssc.withScope {
* updateStateByKey(updateFunc, defaultPartitioner())
* }
*
*/
@Test
def testUpgreateByKey():Unit={
val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
streamingContext.checkpoint("ck")
//只有DS[K, V]类型的才可以调用
val ds2: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1))
.updateStateByKey((x: Seq[Int], y: Option[Int]) => Some(x.sum + y.getOrElse(0)))
ds2.print(100)
}
2)启动程序并向9999端口发送数据
nc -lk 9999
Hello World
Hello Scala
3)结果展示
-------------------------------------------
Time: 1504685175000 ms
-------------------------------------------
-------------------------------------------
Time: 1504685181000 ms
-------------------------------------------
(shi,1)
(shui,1)
(ni,1)
-------------------------------------------
Time: 1504685187000 ms
-------------------------------------------
(shi,1)
(ma,1)
(hao,1)
(shui,1)
2.2 WindowOperations
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长:计算内容的时间范围;
- 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期大小的整数倍。
WordCount第三版:3秒一个批次,窗口12秒,滑步6秒。
object WorldCount {
/**
* streaming app中三个和时间相关的参数:
* 举例:在一个采集周期为3s的app中,每间隔6s计算一次,过去9s的数据!
*
* 采集周期:在app构建streamingcontext中指定 3s
* job提交的时间间隔(滑动步长):6s
* job的数据范围(窗口):9s
* 滑动步长和窗口必须是采集周期的整数倍!!!
*
*
* def reduceByKeyAndWindow(
* reduceFunc: (V, V) => V, //窗口内的计算函数
* windowDuration: Duration, //窗口的间隔大小(数据范围) 9s
* slideDuration: Duration //滑动步长的间隔 6s
* ): DStream[(K, V)] = ssc.withScope {
*
*/
@Test
def testWindow:Unit={
val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
val ds2: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1))
val ds3: DStream[(String, Int)] = ds2.reduceByKeyAndWindow(_ + _,
windowDuration = Seconds(9),
slideDuration = Seconds(6))
ds3.print(100)
}
关于Window的操作还有如下方法:
(1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
- 先开窗,再运算!!!
@Test
def testWind3:Unit={
val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
val ds1: DStream[(String, Int)] = ds.flatMap(x=>x.split(" ")).map((_,1))
//先开窗,指定6s滑动一次,窗口大小为6s
val ds2: DStream[(String, Int)] = ds1.window(Seconds(6),Seconds(9))
//之后的任何操作都会在窗口范围内执行
val ds3: DStream[(String, Int)] = ds2.reduceByKey(_+_)
ds3.print(100)
}
- 使用reduceByKeyAndWindow方法,计算时会使用上次的结果,适用于滑动步长太长的情形
/*
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]
结果一样,更高效!(需要使用ck目录保存状态)
*/
@Test
def testWindow2:Unit={
val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop102",3333)
streamingContext.checkpoint("ck")
val ds1: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1))
val ds2: DStream[(String, Int)] = ds1.reduceByKeyAndWindow(_ + _, _ - _, Seconds(6),
Seconds(6),
streamingContext.sparkContext.defaultParallelism,
_._2 > 0
)
ds2.print(100)
}