0 简介
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
算子状态(operator state)
键控状态(keyed state)
1 算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink为算子状态提供三种基本数据结构:
(1)列表状态(List state)
将状态表示为一组数据的列表。
(2)联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
(3)广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
2 键控状态(keyed state)
2.1 键控状态 (keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
Flink的Keyed State支持以下数据类型:
ValueState[T] 保存单个值,值得类型为T
get操作: ValueState.value()
set操作: ValueState.update(value: T)
ListState[T]保存一个列表,列表里的元素的数据类型为T
ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.get()返回Iterable[T]
ListState.update(values: java.util.List[T])
MapState[K, V]保存Key-Value对
MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)
ReducingState[T]
AggregatingState[I, O]
State.clear()是清空
通过RuntimeContext注册StateDescriptor。
StateDescriptor以状态state的名字和存储的数据类型为参数
在open方法中创建state变量(如果直接初始化,会抛出异常,Exception in thread "main" java.lang.IllegalStateException: The runtime context has not been initialized.)
需求:如果连续两次水位差超过40cm,发生预警信息。
1 object AlarmTest { 2 def main(args: Array[String]): Unit = { 3 4 // TODO 需求:如果连续两次水位差超过40cm,发生预警信息。 5 6 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 7 env.setParallelism(1) 8 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 9 10 //val dataDS: DataStream[String] = env.readTextFile("input/sensor-data2.log") 11 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 12 val waterDS = dataDS.map( 13 data=>{ 14 val datas = data.split(",") 15 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 16 } 17 ) 18 19 // 设定数据的事件时间已经定义Watermark 20 val markDS: DataStream[WaterSensor] = waterDS.assignAscendingTimestamps(_.ts * 1000) 21 22 // TODO 对分区后的数据进行处理 23 markDS.keyBy(_.id) 24 .process( new KeyedProcessFunction[String, WaterSensor, String] { 25 26 private var lastWaterVal : ValueState[Int] = _ 27 28 override def open(parameters: Configuration): Unit = { 29 lastWaterVal = getRuntimeContext.getState[Int]( 30 new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int]) 31 ) 32 } 33 34 // TODO 当水位差超过40cm,马上预警 35 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext, out: Collector[String]): Unit = { 36 out.collect("水位差超过40cm") 37 } 38 39 override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = { 40 // TODO 当前水位应该减去上一次记录的水位是否超过40cm 41 if ( (value.vc - lastWaterVal.value()) > 40 ) { 42 ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000) 43 } 44 lastWaterVal.update(value.vc) 45 46 } 47 } ).print("alarm>>>>") 48 markDS.print("mark>>>>>>>") 49 env.execute() 50 } 51 }
也可以用lazy懒加载方式,
private lazy var lastWaterVal : ValueState[Int] =getRuntimeContext.getState[Int]( new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int])
————————————————
版权声明:本文为CSDN博主「hyunbar」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/hyunbar/article/details/105109565