Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState

 

ValueState[T]保存单个的值,值的类型为T。

  • get操作: ValueState.value()
  • set操作: ValueState.update(value: T)

实例一

scala version

Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

val alerts: DataStream[(String, Double, Double)] = keyedData
  .flatMap(new TemperatureAlertFunction(1.7))

class TemperatureAlertFunction(val threshold: Double)
  extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])

    lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
  }

  override def flatMap(
    reading: SensorReading,
    out: Collector[(String, Double, Double)]
  ): Unit = {
    val lastTemp = lastTempState.value()
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      out.collect((reading.id, reading.temperature, tempDiff))
    }
    this.lastTempState.update(reading.temperature)
  }
}
Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState

上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。

不同key对应的keyed state是相互隔离的。

  • 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
  • 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。

 

上一篇:反射的妙用:C#通过反射动态生成方法拦截器


下一篇:Flink 侧输出流 DEMO