ValueState[T]保存单个的值,值的类型为T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
实例一
scala version
val sensorData: DataStream[SensorReading] = ... val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]( "lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)] ): Unit = { val lastTemp = lastTempState.value() val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { out.collect((reading.id, reading.temperature, tempDiff)) } this.lastTempState.update(reading.temperature) } }
上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。
不同key对应的keyed state是相互隔离的。
- 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
- 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。