Flink+kafka端到端状态一致性保证
什么是状态一致性
- 有状态的流处理,内部每个算子任务都有自己的状态
- 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
- 一条数据不用改丢失,也不应该重复计算
- 在遇到故障时可以恢复状态,恢复以后的重新计算,结果也是完全正确的
Flink内部状态一致性分类
- AT-LEAST-ONCE(至少一次)
大多数真是应用场景,我们希望不丢失时间。这种类型的保障的意思是所有的数据都能得到处理,而且一些时间还可能处理多次
- AT-MOST-ONCE(最多一次)
当任务故障时、最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据,这个语义最多处理一次数据
- **EXACTLY-ONCE(**精准一次)
恰好处理一次是最严格的语义,也是最难实现的,恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅更新一次
Flink通过checkpoint一致性检查点来保证exactly-once的语义,有状态流应用的一致性检查点,其实就是:所有任务的状态,在某个时间点的一份快照,而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。应用状态一致性检查点事flink故障恢复机制的核心。
端到端end-to-end状态一致性
从source -> process ->sink整个流程数据一致性要得到保证,那么如何保证呢?
不丢失数据也不重复消费数据。
1.source 可以设置数据的读取offset
类似于kafka的手从提交offset
2.process
通过checkpoint一致性检查点机制
3.sink端 从故障恢复时,数据不会重复写入外部系统
幂等写入(idempotent writes)
:所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果的更改,也就是说后面的重复执行就不起作用了,比如说向hashMap中多次插入同样(k->v)
事物写入
:ACID【原子性、一致性、隔离性、持久性】
构建的事物对应着checkpoint操作,等到checkpoint真正完成的时候才把所有对应的结果sink到外部系统中
实现方式:
1.WAL预写日志:GenrticWriteAheadSink的DataStreamAPI来实现事物Sink
2.2阶段提交 2PC(two-phase-commit)
:对于每一个checkpoint,sink任务会启动一个事物,并接下来所有接受的数据添加到事务中
然后将这些数据写入外部sink系统,但不提交他们,这时只是预提交;
当它收到checkpoint完成的通知时,它才正式提交,实现结果的真正写入;
这种方式真正实现了exctly-once,它需要一个提供事物支持的外部sink系统。flink提供了TwoPhaseCommitSinkFunction API接口
sink、source | 不可重置消费source | 可重置source |
---|---|---|
任意(Any) | At-most-once | At-least-once |
幂等 | At-most-once | Exactly-once |
预写日志(WAL) | At-most-once | At-least-once |
两阶段提交(2PC) | At-most-once | Exactly-once |
Flink+kafka端到端状态一致性实现
在kafka的0.8以及以前的版本,kafka只能默认将offset存储在zk中;
在kafka的0.9-0.10版本,只能通过以下配置来保证数据的at-least-once语义
出了要开启flink的checkpoint功能,同时还要设置相关配置功能。
因在0.9或者0.10,默认的FlinkKafkaProducer只能保证at-least-once语义,假如需要满足at-least-once语义,我们还需要设置
setLogFailuresOnly(boolean) 默认false
setFlushOnCheckpoint(boolean) 默认true
在0.11及以后版本的kafka,flink能很好的支持exactly-once语义
1.内部,通过checkpoint机制,把状态存盘,发生故障时可以恢复,保证flink内部的一致性
env.enableCheckpointing(60000)
2.source,KafkaConsumer作为source,可以将偏移量保存下来如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性(自动)
//自定义kafkaConsumer,同时可以指定从哪里开始消费
//开启了Flink的检查点之后,我们还要开启kafka-offset的检查点,通过kafkaConsumer.setCommitOffsetsOnCheckpoints(true)开启,
//一旦这个检查点开启,那么之前配置的 auto.commit.enable = true的配置就会自动失效
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
3.sink FlinkkafkaProducer作为Sink,采用两阶段提交的sink,需要实现一个TwoPhaseCommitSinkFunction
// 本身就继承了TwoPhaseCommitSinkFunction,但是我们需要在参数里面传入指定语义,默认时AT-LEAST-ONCE
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
此外还需要进行一些producer容错的配置:
-
除了启用Flink的检查点之外,还可以通过将适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)
来选择三种不同的操作模式:
-
Semantic.NONE 代表at-mostly-once语义
-
Semantic.AT_LEAST_ONCE(Flink默认设置)
-
Semantic.EXACTLY_ONCE 使用Kafka事务提供一次精确的语义,每当您使用事务写入Kafka时,
* <li>decrease number of max concurrent checkpoints</li> * <li>make checkpoints more reliable (so that they complete faster)</li> * <li>increase the delay between checkpoints</li> * <li>increase the size of {@link FlinkKafkaInternalProducer}s pool</li>
请不要忘记为使用Kafka记录的任何应用程序设置所需的设置: isolation.level(read_committed 或 read_uncommitted-后者是默认值)
注意事项
1.Semantic.EXACTLY_ONCE依赖与下游系统能支持事务操作.以0.11版本为例
transaction.max.timeout.ms
最大超市时长,默认15分钟,如果需要用exactly语义,需要增加这个值。isolation.level
如果需要用到exactly语义,需要在下级consumerConfig中设置read-commited [read-uncommited(默认值)],具体看官网transaction.timeout.ms
默认为1hour
Note1:Semantic.EXACTLY_ONCE模式每个FlinkKafkaProducer011实例使用一个固定大小的KafkaProducers池。每个检查点使用这些生产者中的每一个。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常,并使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
Note2:Semantic.EXACTLY_ONCE采取所有可能的措施,不要留下任何挥之不去的交易,否则这将有碍于消费者更多地阅读Kafka主题。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中将没有有关先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序的FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR
//1。设置最大允许的并行checkpoint数,防止超过producer池的个数发生异常
env.getCheckpointConfig.setMaxConcurrentCheckpoints(5)
//2。设置producer的ack传输配置
// 设置超市时长,默认15分钟,建议1个小时以上
producerConfig.put(ProducerConfig.ACKS_CONFIG, 1)
producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15000)
//3。在下一个kafka consumer的配置文件,或者代码中设置ISOLATION_LEVEL_CONFIG-read-commited
//Note:必须在下一个consumer中指定,当前指定是没用用的
kafkaonfigs.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_commited")
示例完整代码:
package com.shufang.flink.connectors
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSource01 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//这是checkpoint的超时时间
//env.getCheckpointConfig.setCheckpointTimeout()
//设置最大并行的chekpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(5)
env.getCheckpointConfig.setCheckpointInterval(1000) //增加checkpoint的中间时长,保证可靠性
/**
* 为了保证数据的一致性,我们开启Flink的checkpoint一致性检查点机制,保证容错
*/
env.enableCheckpointing(60000)
/**
* 从kafka获取数据,一定要记得添加checkpoint,能保证offset的状态可以重置,从数据源保证数据的一致性
* 保证kafka代理的offset与checkpoint备份中保持状态一致
*/
val kafkaonfigs = new Properties()
//指定kafka的启动集群
kafkaonfigs.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
//指定消费者组
kafkaonfigs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flinkConsumer")
//指定key的反序列化类型
kafkaonfigs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
//指定value的反序列化类型
kafkaonfigs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
//指定自动消费offset的起点配置
// kafkaonfigs.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
/**
* 自定义kafkaConsumer,同时可以指定从哪里开始消费
* 开启了Flink的检查点之后,我们还要开启kafka-offset的检查点,通过kafkaConsumer.setCommitOffsetsOnCheckpoints(true)开启,
* 一旦这个检查点开启,那么之前配置的 auto-commit-enable = true的配置就会自动失效
*/
val kafkaConsumer = new FlinkKafkaConsumer[String](
"console-topic",
new SimpleStringSchema(), // 这个schema是将kafka的数据应设成Flink中的String类型
kafkaonfigs
)
// 开启kafka-offset检查点状态保存机制
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
// kafkaConsumer.setStartFromEarliest()//
// kafkaConsumer.setStartFromTimestamp(1010003794)
// kafkaConsumer.setStartFromLatest()
// kafkaConsumer.setStartFromSpecificOffsets(Map[KafkaTopicPartition,Long]()
// 添加source数据源
val kafkaStream: DataStream[String] = env.addSource(kafkaConsumer)
kafkaStream.print()
val sinkStream: DataStream[String] = kafkaStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(5)) {
override def extractTimestamp(element: String): Long = {
element.split(",")(1).toLong
}
})
/**
* 通过FlinkkafkaProduccer API将stream的数据写入到kafka的'sink-topic'中
*/
// val brokerList = "localhost:9092"
val topic = "sink-topic"
val producerConfig = new Properties()
producerConfig.put(ProducerConfig.ACKS_CONFIG, new Integer(1)) // 设置producer的ack传输配置
producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Time.hours(2)) //设置超市时长,默认1小时,建议1个小时以上
/**
* 自定义producer,可以通过不同的构造器创建
*/
val producer: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
topic,
new KeyedSerializationSchemaWrapper[String](SimpleStringSchema),
producerConfig,
Semantic.EXACTLY_ONCE
)
// FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR
/** *****************************************************************************************************************
* * 出了要开启flink的checkpoint功能,同时还要设置相关配置功能。
* * 因在0.9或者0.10,默认的FlinkKafkaProducer只能保证at-least-once语义,假如需要满足at-least-once语义,我们还需要设置
* * setLogFailuresOnly(boolean) 默认false
* * setFlushOnCheckpoint(boolean) 默认true
* * come from 官网 below:
* * Besides enabling Flink’s checkpointing,you should also configure the setter methods setLogFailuresOnly(boolean)
* * and setFlushOnCheckpoint(boolean) appropriately.
* ******************************************************************************************************************/
producer.setLogFailuresOnly(false) //默认是false
/**
* 除了启用Flink的检查点之外,还可以通过将适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)
* 来选择三种不同的操作模式:
* Semantic.NONE 代表at-mostly-once语义
* Semantic.AT_LEAST_ONCE(Flink默认设置)
* Semantic.EXACTLY_ONCE:使用Kafka事务提供一次精确的语义,每当您使用事务写入Kafka时,
* 请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)
*/
sinkStream.addSink(producer)
env.execute("kafka source & sink")
}
}