这里将介绍Flink对有状态计算的支持,其中包括状态计算和无状态计算的区别,以及在Flink中支持的不同状态类型,分别有 Keyed State 和 Operator State 。另外针对状态数据的持久化,以及整个 Flink 任务的数据一致性保证,Flink 提供了 Checkpoint 机制处理和持久化状态结果数据,随后对状态数据 Flink 提供了不同的状态管理器来管理状态数据,例如: MemoryStateBackend 等。
有状态计算
在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一。有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用。如图所示:
状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成更加复杂的计算逻辑。和状态计算不同的是,无状态计算不会存储计算过程中产生的结果,也不会将结果用于下一步计算过程中,程序只会在当前的计算流程中实行计算,计算完成就输出结果,然后下一条数据接入,然后处理。
无状态计算实现的复杂度相对较低,实现起来比较容易,但是无法完成提到的比较复杂的业务场景,例如:
- [ ] 用户想实现CEP(复杂事件处理),获取符合某一特定时间规则的事件,状态计算就可以将接入的事件进行存储,然后等待符合规则的事件触发;
- [ ] 用户想要按照 minutes / hour / day 等进行聚合计算,求取当前最大值、均值等聚合指标,这就需要利用状态来维护当前计算过程中产生的结果,例如事件的总数、总和以及最大,最小值等;
- [ ] 用户想在 Srteam 上实现机器学习的模型训练,状态计算可以帮助用户维护当前版本模型使用的参数;
- [ ] 用户想使用历史的数据进行计算,状态计算可以帮助用户对数据进行缓存,使用户可以直接从状态中获取相应的历史数据。
Flink 状态及应用
状态类型
在 Flink 中根据数据集是否根据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State) 两种类型。
Keyed State
表示和key相关的一种state ,只能用于 KeyedStream 类型数据集对应的Functions和Operators之上。Keyed State 是 Operator State 的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应一个 Operator 和 Key 的组合。 Keyed State 可以通过 Key Group 进行管理,主要用于当算子并行度发生变化时,自动重新分布 Keyed State 数据。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的 Key 无关,每个算子实例中持有所有数据元素中的一部分状态数据。 Operator State 支持当算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中 Keyed State 和 Operator State 均具有两种形式,其中一种为托管状态(Managered State)形式,由Flink Runtime 中控制和管理状态数据,并将状态数据转换称为内存Hash tables 或 Recks DB 的对象存储,然后将这些状态数据通过内部接口持久化到 Checkpoints 中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发 Checkpoints 过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成 bytes 数据存储在 Checkpoints 中,当从 Checkpoints 恢复任务时,算子自己在反序列化出状态的数据结构。
Notes: Flink中推荐用户使用 Managered State 管理状态数据,主要原因是:Manager State 能够更好的支持状态数据的重平衡以及更加完善的内存管理。
Managered Keyed State
Flink 有以下Managered Keyed State 类型可以使用,每种状态都有相应的的使用场景,用户可以根据实际需求选择使用。
- [ ]
ValueState[T]
: 与 Key 对应单个值的状态,例如统计 user_id 对应的交易次数,每次用户交易都会在 count 状态值上进行更新。 ValueState 对应的更新方法是update(T)
, 取值是T value()
; - [ ]
ListState[T]
: 与 Key 对应元素列表的状态,状态中存放元素的 List 列表。例如定义 ListValue存储用户经常访问的 IP 地址。在 ListState 中添加元素使用add(T) , addAll(List[T])
两个方法。获取元素使用Iterable<T> get()
方法,更新元素使用update(List[T])
方法; - [ ]
ReducingState[T]
: 定义与 Key 相关的数据元素单个聚合值的状态,用户存储经过指定 ReduceFunction 计算之后的指标,因此,ReduceState 需要指定ReduceFunction 完成状态数据的聚合。ReducingState 添加元素使用add(T)
方法,获取元素使用T get()
; - [ ]
AggregeateState[IN,OUT]
: 定义 与key相关的数据元素单个聚合值的状态,用于维护数据经过指定 AggregateFunction 计算之后的指标。和ReducingState相比,AggregeateState 的输入输出类型不一定相同,但ReducingState 输入/出 类型必须保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成状态数据的聚合操作。AggregatringState添加元素使用add(IN)
方法, 获取元素使用OUT get()
方法; - [ ]
MapState<UK, UV>
:这会保留一个映射列表。您可以将键值对放入状态并检索Iterable所有当前存储的映射。使用put(UK, UV)
或 添加映射putAll(Map[UK,UV])
(Map<UK, UV>)。可以使用来检索与用户键关联的值get(UK)
。对于映射,键和值可迭代视图可以使用被检索entries()
,keys()
并values()
分别。
Stateful Function定义
示例:
在RichFlatMapFunction 中定义 ValueState,已完成最小值的获取:
inputStream.keyBy(_._1).flatMap(
// (String,Long,Int) 输入类型
// (String,Long,Long) 输出类型
new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
private var leastValueState:ValueState[Long] = _
// 定义状态名称
private var leastValueStateDesc:ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
// 指定状态类型
leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
// 通过 getRuntimeContext.getState 拿到状态
leastValueState = getRuntimeContext.getState(leastValueStateDesc)
}
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
// 通过 value 拿到最小值
val leastValue: Long = leastValueState.value()
// 如果前一个指标大于最小值,则直接输出数据元素和最小值
if ( leastValue != 0L && value._2 > leastValue){
out.collect((value._1 , value._2 , leastValue))
}else{
// 如果当前指标小于最小值,则更新状态中的最小值
leastValueState.update(value._2)
// 将当前数据中的指标作为最小值输出
out.collect(value._1 , value._2 , value._2)
}
}
}).print()
State生命周期
对于任何类型 Keyed State 都可以设定状态生命周期(TTL),以确保能够在规定时间内即时清理状态数据。状态生命周期功能可通过 StateTtlConfig 配置然后将 StateTtlConfig 配置传入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置实例如下所示:
val config: StateTtlConfig = StateTtlConfig
// 指定TTL时长为 5s
.newBuilder(Time.seconds(5))
// 指定TTL 刷新只对创建和写入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 指定状态可见性不返回过期数据
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
leastValueStateDesc.enableTimeToLive(config)
在StateTtlConfig中除了通过 newBuilder() 方法中设定过期时间的参数是必须的之外,其他的参数都是可选的或使用默认值。其中 setUpdateType方法中传入的类型有两种:
- StateTtlConfig.UpdateType.onCreateAndWrite 仅在创建和写入时更新 TTL ;
- StateTtlConfig.UpdateType.OnReadAndWriter 仅在读与写操作都更新 TTL ;
需要注意的是,过期的状态数据根据UpdateType参数进行配置,只有被写入或者读取的是时间才会更新TTL,也就是说如果某个状态指标一直不被使用活着更新,则永远不会触发对该状态数据的清理操作,这种情况可能会导致系统中的状态数据越来越大。
另外,可以通过 setStateVisibility 方法设定状态的可见性,根据过期数据是否被清理来确定是否返回状态数据:
- StateTtlConfig.StateVisibility.NeverReturnExpired: 状态数据过期就不会返回(默认)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 状态数据即使过期但没有被清理依然返回
Scala DataStream API中使用状态
直接上代码片段:
inputStream.keyBy(_._1)
// 指定输入参数类型和状态参数类型
.mapWithState((in:(Int,Long) , count : Option[Int]) =>
// 判断count 类型是否非空
count match {
// 输出 key , count 并在原来 count 数据上累加
case Some(c) => ((in._1 , c) , Some(c + in._2))
// 如果状态为空,则将指标填入
case None => ((in._1 , 0) , Some(in._2))
}
)
Manager Operator State
Operator State 是一种 non-keyed-state ,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 消费端算子实例都对应到 Kafka 的一个分区中,维护Topic分区和 Offsets 偏移量作为算子的 Operator State. 在Flink中可以实现 CheckpointedFunction
或者 ListCheckpoint<T extends Serializable>
两个接口来定义操作 Managered Operator State 的函数。
通过 CheckpointedFunction 接口操作Operator State
CheckpointedFunction 接口定义如图:
@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
在每个独立的算子中,Managered Operator State 都是以 List 形式存储的,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持Manager Operator State 两种重要分布策略,分别是 Event-split Redistribution 和 Union Redistribution。
- [ ] Event-split Redistribution: 每个算子实例中含有部分元素的List列表,整个状态数据是所有List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度相同数量的List列表,每个 task 实例中有一个 List,其可以为空或者含有多个元素。
- [ ] Union Redistribution: 每个算子实例中含有所有状态元素的List 列表,当触发 restore/redistribution 动作时,每个算子可以获取到完整的状态元素列表。
/**
* @title CheckpointCount
* @description 实现 CheckpointFunction 接口利用Operator State 统计输入到算子的数据量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 9:16
*/
class CheckpointCount(val numElements: Int) extends FlatMapFunction[(Int, Long), (Int, Long, Long)] with CheckpointedFunction {
// 定义算子实例本地变量,存储Operator数据数量
private var operatorCount: Long = _
// 定义 keyedState ,存储和 key 相关的状态值
private var keyedState: ValueState[Long] = _
// 定义 operatorState , 存储算子的状态值
private var operatorState: ListState[Long] = _
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
val keyedCount: Long = keyedState.value()
// 更新 keyedState 数量
keyedState.update(keyedCount)
// 更新本地的算子 operatorCount
operatorCount = operatorCount + 1
// 输出结果,包括 id , id 对应的的数量统计 keyedCount ,算子输入数据的数量统计 operatorCount
out.collect(value._1, keyedCount, operatorCount)
}
// 当发生了 snapshotState , 将 operatorCount 添加到 operatorState 中
override def snapshotState(context: FunctionSnapshotContext): Unit = {
operatorState.clear()
operatorState.add(operatorCount)
}
// 初始化状态数据
override def initializeState(context: FunctionInitializationContext): Unit = {
// 定义并获取 keyedState
keyedState = context.getKeyedStateStore.getState(new ValueStateDescriptor[Long]("keye-state", classOf[Long]))
// 定义并获取 operatorState
operatorState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("operator-state", classOf[Long]))
// 定义在 Restored 过程中, 从 operatorState 中恢复数据的逻辑
if (context.isRestored){
val value: util.Iterator[Long] = operatorState.get().iterator()
while (value.hasNext){
operatorCount += value.next()
}
}
}
}
通过 ListCheckpointed接口定义 Operator State
/**
* @title NumberRecordsCount
* @description 实现 ListCheckpoint接口利用Operator State 统计算子输入数据数量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 10:14
*/
class NumberRecordsCount extends FlatMapFunction[(String, Long), (String, Long)] with ListCheckpointed[Long] {
// 定义算子中接入的 numberRecords 数量
private var numberRecords: Long = 0L
override def flatMap(value: (String, Long), out: Collector[(String, Long)]): Unit = {
// 介入一条计算规则进行统计,并输出
numberRecords += 1
out.collect(value._1, numberRecords)
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
Collections.singletonList(numberRecords)
}
override def restoreState(state: util.List[Long]): Unit = {
numberRecords = 0L
for (count <- state) {
// 从恢复状态中 恢复 numberRecords
numberRecords += count
}
}
}
Checkpoints 和 Savepoints
Checkpoints检查机制
Flink 中基于异步轻量级的分布式快照技术提供了 Checkpoints 容错机制,分布式快找可以将同一时间点 Task / Operator 的状态数据全局统一快照处理,包括前面提到的Keyed State 和 Operator State . Flink 会在输入的数据集上间隔性的生成checkpoint barrier ,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoint 中,当应用出现异常时,Operator 就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。
举个栗子:在 KafkaConsumer 算子维护 Offset 状态,当系统出现问题无法从 Kafka 中消费数据时,可以将 Offset 记录在状态中,当系统出现问题,无法从Kafka消费数据时,可以将 Offset 记录在状态中,当任务重新恢复时就能够指定偏移量消费数据。
Checkpoint 过程中状态数据一般会被保存在一个可配置的环境中,通常在 JobManager节点或者HDFS上。
Checkpoint 开启和时间间隔指定
开启检查点并且指定检查点时间间隔为 1000ms ,根据实际情况自行选择,如果状态比较大,则建议适当增加该值;
environment.enableCheckpointing(1000)
exactly-ance 和 at-least-once 语义
可以选择 exactly-once 语义保证整个应用内 端到端 的数据一致性,这种情况比较适合数据要求高,不允许出现数据丢失或重复,与此同时,Flink 的性能也相对较弱,而 at-least-once 语义更适合于时延和吞吐要求非常高但对数据一致性要求不高的场景。
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
Checkpoint 超时时间
超时时间制定了每次Checkpoint 执行过程中的上限时间范围,一旦 Checkpoint 执行时间超过该阈值,Flink 将会中断Checkpoint 过程,并按照超时处理。该指标可以通过 setCheckpointTimeout 方法设定,默认 10 分钟
environment.getCheckpointConfig.setCheckpointTimeout(60000)
检查点之间最小时间间隔
该参数主要目的是设定两个Checkpoint 之间最小时间间隔,防止出现例如状态数据过大导致Checkpoint 执行时间过长,导致 Checkpoint 积压过多,最终Flink 应用密集地触发 Checkpoint 操作,会占用大量计算资源而影响到整个应用的性能
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
最大并行度执行检查点数量
通过 setMaxCurrentCheckpoint()方法设定能够最大同时执行的 Checkpoint 数量。在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率.
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
外部检查点
设定周期性的外部检查点,然后将状态数据持久化到外部系统中,使用这种方式不会在任务正常停止的过程中清理检查点数据,而是会一直保持在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复.
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
failOnCheckpointingErrors
ailOnCheckpointingErrors 参数决定了当Checkpoint执行过程中如果出现失败或者错误时,任务是否同时被关闭,默认值为True
environment.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// 上述的方式已经被弃用了,使用下面的方式
val number: Int = environment.getCheckpointConfig.getTolerableCheckpointFailureNumber
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(number)
Savepoints 机制
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
Operator ID 配置
当使用 Savepoints 对整个集群进行升级或运维操作的时候,需要停止整个 Flink 应用程序,此时用户可能会对应用的代码逻辑进行修改,即时 Flink 能够通过 Savepoint 将应用中的状态数据同步到磁盘然后恢复任务,但由于代码逻辑发生了变化,在升级过程中有可能导致算子的状态无法通过 Savepoints 中的数据恢复的情况,在这种情况下就需要通过唯一的 ID 标记算子。在Flink中默认支持自动生成 Operator ID, 但是这种方式不利于对代码层面的维护和升级,建议用户尽可能使用手工方式对算子进行唯一 ID 标记, ID 的应用范围在每个算子内部,具体的使用方式如下:
environment.addSource(new SourceFunction[] {})
.uid("source-id")
.shuffle()
.map(new MapFunction[] {})
.uid("map-id")
.print()
Savepoints 操作
Savepoint 操作可以通过命令行的方式进行触发,命令行提供了取消任务,从Savepoints中恢复任务,撤销 Savepoints 等操作,在 Flink1.2 中以后也可以通过FlinkWeb页面从 Savepoints中恢复应用。
手动触发 Savepoints
bin/flink savepoint :jobId [:targetDirectory]
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
取消任务并处触发Savepoints
bin/flink cancel -s [:targetDirectory] :jobId
通过Savepoints中恢复任务
bin/flink run -s :savepointPath [:runArgs]
释放Savepoints数据
bin/flink savepoint -d :savepointPath
通过 --dispose (-d) 命令释放已经存储的 Savepoint 数据,这样存储在指定路径中的 savepointPath 将会被清理掉
TargetDirectory 配置
TargetDirectory配置
state.savepoints.dir: hdfs:///flink/savepoints
TargetDirectory 文件目录
# 查看 TargetDirectory 文件目录
hdfs dfs -ls /flink/flink-savepoints/savepoint-11bbc5-bd967f90709b
状态管理器
在Flink 中提供了 StateBackend 来存储和管理 Checkpoints 过程中的状态数据。
StateBackend 类型
Flink中一共实现了三种类型的状态管理器,包括基于内存的MemoryStateBackend
、基于文件系统的 FsStateBackend
, 以及基于 RockDB 作为存储介质的 RockDBStateBackend
.
MemoryStateBackend
基于内存的状态管理器将状态数据全部存储在JVM堆内存中,包括用户在使用 DataStream API 中创建 Key/Value State,窗口中缓存的状态数据,以及触发器等数据基于内存的状态管理器具有非常快速和高校的特点,但也有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中得玩状态数据。因此这个玩意,避免使用。
Flink 将MemoryStateBackend 作为默认的状态后端管理器,也可以通过如下参数配置初始化 MemoryStateBackend , 其中 "MAX_MEN_STATE_SIZE" 指定每个状态值的内存使用大小。
new MemoryStateBackend(MAX_MEN_STATE_SIZE , false)
在Flink 中 MemoryStateBackend 具有如下特点:
- 聚合类算子的状态会存储在 JobManager 内存中,因此对于聚合类算子比较多的应用会对 JobManager 内存有一定的压力,进而对整个集群会造成较大的负担
- 创建MemoryStateBackend时可以指定状态初始化内存大小,但状态数据传输大小会受限于Akka框架通信的“akka.framesize” 大小限制(默认: 10485760 bit -> 1024 * 1024 * 10 )
- JVM内存容量受限于主机内存大小,也就是说不管是 JobManager 内存还是在 TaskManager 的内存中维护状态数据都有内存的限制,因此对于非常大的状态数据不适合使用 MemoryStateBackend 去存储
important MemoryStateBackend 比较适合测试环境,并用于本地调试和验证,不建议在生产环境中使用。
FsStateBackend
与MemoryStateBackend 有所不同,FsStateBackend 是基于文件系统的一种状态管理器在,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统。
new FsStateBackend(path , false)
FsStateBackend 的 Boolean 参数类型指定是否以同步的方式记录状态数据,默认采用异步方式。异步方式可以尽可能避免在Checkpoint过程中影响流式计算任务
RockDBStateBackend
RockDBStateBackend 是Flink 中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend 需要单独引入相关的依赖包到工程中,通过初始化 RockDBStateBackend 类,使可以得到 RockDBStateBackend 实例类。
RocksDBStateBackend 采用异步的方式进行状态数据的 Snapshot ,任务中的状态数据首先被写入 RockDB中,然后再异步的将状态数据写入文件系统中,这样RockDB仅会存储在正在进行的计算的数据,对于长时间才更新的数据则写入磁盘中进行存储,而对于体量比较小的元数据状态,则存储在 JobManager 内存中。
与 FsStateBackend 相比,RockDBStateBackend性能更高,主要是因为借助了 RockDB 存储了最新最热的数据,然后通过异步的方式在同步到文件系统中。