【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器-1.触发器

触发器决定了窗口何时由窗口计算函数进行处理。
(触发器就类比枪的扳机,触发后 计算函数开始计算,计算函数在【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

每个窗口分配器都带有一个默认触发器。如果默认触发器不能满足业务需求,就需要自定义触发器。

实现自定义触发器的方法很简单,只需要继承Trigger接口并实现它的方法即可。
Trigger接口有五种方法,允许触发器对不同的事件作出反应,
具体如下:

  1. onElement()方法:每个元素被添加到窗口时调用;
  2. onEventTime()方法:当一个已注册的事件时间计时器启动时调用;
  3. onProcessingTime()方法:当一个已注册的处理时间计时器启动时调用;
  4. onMerge()方法:与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态;
  5. clear()方法:执行任何需要清除的相应窗口。
    在这里插入图片描述

触发器通过 TriggerContext 来管理和检查状态。
在触发器中,我们通常会使用 状态 来记录窗口中的一些信息,如已处理的事件数量或累计的值。这些状态决定了窗口是否应当触发计算。

触发器中的 TriggerResult 有几个重要的结果:

CONTINUE:表示窗口继续等待更多的事件,不触发计算。
FIRE:表示触发窗口计算并输出结果。
PURGE:表示删除某些数据,通常在某些特殊场景下使用。

1.1 代码示例

假设股票价格数据流连续不断到达系统,现在需要对到达的数据进行监控,每到达5条数据就触发计算。实现该功能的代码如下:

import java.util.Calendar
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object TriggerTest {
  def main(args: Array[String]) {
 
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    //设置程序并行度
    env.setParallelism(1)
 
    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
    //创建数据源,股票价格数据流
    val source = env.socketTextStream("localhost", 9999)
 

//指定针对数据流的转换操作逻辑
    val stockPriceStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
    val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(50))
      .trigger(new MyTrigger(5))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
 
    //打印输出
    sumStream.print()
 
    //程序触发执行
    env.execute("Trigger Test")
  }

 
  class MyTrigger extends Trigger[StockPrice, TimeWindow] {
    //触发计算的最大数量
    private var maxCount: Long = _
 
    //记录当前数量的状态
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
 
    def this(maxCount: Int) {
      this()
      this.maxCount = maxCount      
}
 
    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
           TriggerResult.CONTINUE
}
 

 override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
         TriggerResult.CONTINUE
} 
    override def onElement(element: StockPrice, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      val countState = ctx.getPartitionedState(countStateDescriptor)
      //计数状态加1
      countState.add(1L)      
      if (countState.get() >= this.maxCount) {
        //达到指定指定数量       
        //清空计数状态
        countState.clear()
        //触发计算        
        TriggerResult.FIRE
      } else {
        TriggerResult.CONTINUE
      }
    }
 
    //窗口结束时清空状态
    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
      println("窗口结束时清空状态")
      ctx.getPartitionedState(countStateDescriptor).clear()
    }
 
    //更新状态为累加值
    class Sum extends ReduceFunction[Long] {
      override def reduce(value1: Long, value2: Long): Long = value1 + value2
    } 
  }
}

注意 这一行代码:

val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(50))
      .trigger(new MyTrigger(5))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

在该代码中,MyTrigger 是一个自定义触发器,它控制在窗口中积累一定数量的事件后触发计算。

具体来说,窗口中的数据会根据 StockPrice 的数量来决定是否触发计算,而不是依赖于时间 。

接下来分析代码,

def this(maxCount: Int) {
  this()
  this.maxCount = maxCount      
}

这是它的主构造函数,它接受一个 maxCount 参数,表示在触发器中窗口内允许的最大元素数量。也就是说,窗口中的元素数达到 maxCount 时,触发器会触发计算(即调用 TriggerResult.FIRE)。
.trigger(new MyTrigger(5))表示创建一个 MyTrigger 的实例,并传入一个 maxCount 为 5 的参数,意思是窗口中最大允许 5 个元素,达到该数量后,窗口会触发计算。

private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])

lazy:表示这是一个延迟初始化的变量。只有在第一次使用 countStateDescriptor 时,才会初始化它。这在性能优化上有作用,避免了不必要的初始化开销。

ReducingStateDescriptor 是 Flink 提供的一个状态描述符,它用于定义一个可减少的状态。这个状态会随着事件的到来不断累积,并且可以执行自定义的聚合操作。在 ReducingStateDescriptor 中,状态值的更新是通过 ReduceFunction 来实现的。

在这里,ReducingStateDescriptor[Long] 定义了一个状态,它的值是 Long 类型,并且该状态将执行 聚合操作(即对 Long 类型的值进行合并)。

ReducingStateDescriptor 的构造函数接受三个参数:

1.状态名称:“counter”
这是该状态的名称,用于在 Flink 的状态后端存储中标识该状态。

2.聚合操作:new Sum
这是一个 ReduceFunction 的实例,它定义了如何合并状态。在这个例子中,Sum 类是一个自定义的 ReduceFunction,用于对 Long 类型的值进行加法操作。

3.状态类型:classOf[Long]
这是状态的类型。classOf[Long] 表示状态值的类型是 Long,用于在 Flink 的状态管理中描述状态类型。

ClassOf(Long)这是 Scala 反射机制的语法,用于获取 Long 类型的 Class 对象。它在这里用于指定状态值的类型,以便 Flink 的状态管理能够正确地处理状态。

接下来的几个方法**TriggerResult.CONTINUE* 表示继续,不触发计算。
接下来是onElement方法

val countState = ctx.getPartitionedState(countStateDescriptor)

然后计数器+1,加到5就触发。
看是否到达maxcount。到达就触发,不到达就不触发,

自己的疑惑:才开始看代码的时候我一直纠结ctx( ctx.getPartitionedState(countStateDescriptor))这是从哪里来的,这个是

org.apache.flink.streaming.api.windowing.triggers.Trigger

下面的实例化对象,直接使用即可。

代码最后:
Sum类:它的作用是在状态更新时执行对状态的累加操作。

为什么用 Sum 作为累加器?

由于我们在 Trigger 中的 onElement 方法使用了 ctx.getPartitionedState(countStateDescriptor) 来获取一个 ReducingState(累加状态),这个状态将会不断地被更新,每次一个新元素进入时,都会触发 reduce 操作。

Sum 类就是定义了如何对这个 ReducingState 状态进行累加操作。
ReduceFunction 提供了累加器的逻辑,这样当多个元素进来时,value1 和 value2 就会被相加,最终在窗口中保持一个累积的状态。

上一篇:鸿蒙Flutter环境相关问题