Flink+kafka端到端状态一致性保证

Flink+kafka端到端状态一致性保证

什么是状态一致性

  • 有状态的流处理,内部每个算子任务都有自己的状态
  • 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
  • 一条数据不用改丢失,也不应该重复计算
  • 在遇到故障时可以恢复状态,恢复以后的重新计算,结果也是完全正确的

Flink内部状态一致性分类

  • AT-LEAST-ONCE(至少一次)

大多数真是应用场景,我们希望不丢失时间。这种类型的保障的意思是所有的数据都能得到处理,而且一些时间还可能处理多次

  • AT-MOST-ONCE(最多一次)

当任务故障时、最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据,这个语义最多处理一次数据

  • **EXACTLY-ONCE(**精准一次)

恰好处理一次是最严格的语义,也是最难实现的,恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅更新一次

Flink通过checkpoint一致性检查点来保证exactly-once的语义,有状态流应用的一致性检查点,其实就是:所有任务的状态,在某个时间点的一份快照,而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。应用状态一致性检查点事flink故障恢复机制的核心。

端到端end-to-end状态一致性

从source -> process ->sink整个流程数据一致性要得到保证,那么如何保证呢?

不丢失数据也不重复消费数据。

1.source 可以设置数据的读取offset

类似于kafka的手从提交offset

2.process

通过checkpoint一致性检查点机制

3.sink端 从故障恢复时,数据不会重复写入外部系统

幂等写入(idempotent writes)

:所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果的更改,也就是说后面的重复执行就不起作用了,比如说向hashMap中多次插入同样(k->v)

事物写入

:ACID【原子性、一致性、隔离性、持久性】

构建的事物对应着checkpoint操作,等到checkpoint真正完成的时候才把所有对应的结果sink到外部系统中

实现方式:

1.WAL预写日志:GenrticWriteAheadSink的DataStreamAPI来实现事物Sink

2.2阶段提交 2PC(two-phase-commit

:对于每一个checkpoint,sink任务会启动一个事物,并接下来所有接受的数据添加到事务中

然后将这些数据写入外部sink系统,但不提交他们,这时只是预提交;

当它收到checkpoint完成的通知时,它才正式提交,实现结果的真正写入;

这种方式真正实现了exctly-once,它需要一个提供事物支持的外部sink系统。flink提供了TwoPhaseCommitSinkFunction API接口

sink、source 不可重置消费source 可重置source
任意(Any) At-most-once At-least-once
幂等 At-most-once Exactly-once
预写日志(WAL) At-most-once At-least-once
两阶段提交(2PC) At-most-once Exactly-once

Flink+kafka端到端状态一致性保证
Flink+kafka端到端状态一致性保证

Flink+kafka端到端状态一致性实现

在kafka的0.8以及以前的版本,kafka只能默认将offset存储在zk中;

在kafka的0.9-0.10版本,只能通过以下配置来保证数据的at-least-once语义

出了要开启flink的checkpoint功能,同时还要设置相关配置功能。
因在0.9或者0.10,默认的FlinkKafkaProducer只能保证at-least-once语义,假如需要满足at-least-once语义,我们还需要设置
setLogFailuresOnly(boolean)    默认false
setFlushOnCheckpoint(boolean)  默认true

在0.11及以后版本的kafka,flink能很好的支持exactly-once语义

1.内部,通过checkpoint机制,把状态存盘,发生故障时可以恢复,保证flink内部的一致性

env.enableCheckpointing(60000)

2.source,KafkaConsumer作为source,可以将偏移量保存下来如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性(自动)

//自定义kafkaConsumer,同时可以指定从哪里开始消费
//开启了Flink的检查点之后,我们还要开启kafka-offset的检查点,通过kafkaConsumer.setCommitOffsetsOnCheckpoints(true)开启,
//一旦这个检查点开启,那么之前配置的 auto.commit.enable = true的配置就会自动失效
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

3.sink FlinkkafkaProducer作为Sink,采用两阶段提交的sink,需要实现一个TwoPhaseCommitSinkFunction

// 本身就继承了TwoPhaseCommitSinkFunction,但是我们需要在参数里面传入指定语义,默认时AT-LEAST-ONCE
public class FlinkKafkaProducer<IN>
	extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {

此外还需要进行一些producer容错的配置:

  • 除了启用Flink的检查点之外,还可以通过将适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)

    来选择三种不同的操作模式:

  • Semantic.NONE 代表at-mostly-once语义

  • Semantic.AT_LEAST_ONCE(Flink默认设置)

  • Semantic.EXACTLY_ONCE 使用Kafka事务提供一次精确的语义,每当您使用事务写入Kafka时,

      * <li>decrease number of max concurrent checkpoints</li>
      * <li>make checkpoints more reliable (so that they complete faster)</li>
      * <li>increase the delay between checkpoints</li>
      * <li>increase the size of {@link FlinkKafkaInternalProducer}s pool</li>
    

    请不要忘记为使用Kafka记录的任何应用程序设置所需的设置: isolation.level(read_committed 或 read_uncommitted-后者是默认值)

注意事项

1.Semantic.EXACTLY_ONCE依赖与下游系统能支持事务操作.以0.11版本为例

transaction.max.timeout.ms 最大超市时长,默认15分钟,如果需要用exactly语义,需要增加这个值。
isolation.level 如果需要用到exactly语义,需要在下级consumerConfig中设置read-commited [read-uncommited(默认值)],具体看官网
transaction.timeout.ms 默认为1hour

Note1:Semantic.EXACTLY_ONCE模式每个FlinkKafkaProducer011实例使用一个固定大小的KafkaProducers池。每个检查点使用这些生产者中的每一个。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常,并使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数

Note2:Semantic.EXACTLY_ONCE采取所有可能的措施,不要留下任何挥之不去的交易,否则这将有碍于消费者更多地阅读Kafka主题。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中将没有有关先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序的FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR

//1。设置最大允许的并行checkpoint数,防止超过producer池的个数发生异常
env.getCheckpointConfig.setMaxConcurrentCheckpoints(5) 
//2。设置producer的ack传输配置
// 设置超市时长,默认15分钟,建议1个小时以上
producerConfig.put(ProducerConfig.ACKS_CONFIG, 1) 
producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15000) 

//3。在下一个kafka consumer的配置文件,或者代码中设置ISOLATION_LEVEL_CONFIG-read-commited
//Note:必须在下一个consumer中指定,当前指定是没用用的
kafkaonfigs.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_commited")

示例完整代码:

package com.shufang.flink.connectors

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSource01 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //这是checkpoint的超时时间
    //env.getCheckpointConfig.setCheckpointTimeout()
    //设置最大并行的chekpoint
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(5)
    env.getCheckpointConfig.setCheckpointInterval(1000) //增加checkpoint的中间时长,保证可靠性


    /**
     * 为了保证数据的一致性,我们开启Flink的checkpoint一致性检查点机制,保证容错
     */
    env.enableCheckpointing(60000)

    /**
     * 从kafka获取数据,一定要记得添加checkpoint,能保证offset的状态可以重置,从数据源保证数据的一致性
     * 保证kafka代理的offset与checkpoint备份中保持状态一致
     */

    val kafkaonfigs = new Properties()

    //指定kafka的启动集群
    kafkaonfigs.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    //指定消费者组
    kafkaonfigs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flinkConsumer")
    //指定key的反序列化类型
    kafkaonfigs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    //指定value的反序列化类型
    kafkaonfigs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    //指定自动消费offset的起点配置
    //    kafkaonfigs.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")


    /**
     * 自定义kafkaConsumer,同时可以指定从哪里开始消费
     * 开启了Flink的检查点之后,我们还要开启kafka-offset的检查点,通过kafkaConsumer.setCommitOffsetsOnCheckpoints(true)开启,
     * 一旦这个检查点开启,那么之前配置的 auto-commit-enable = true的配置就会自动失效
     */
    val kafkaConsumer = new FlinkKafkaConsumer[String](
      "console-topic",
      new SimpleStringSchema(), // 这个schema是将kafka的数据应设成Flink中的String类型
      kafkaonfigs
    )

    // 开启kafka-offset检查点状态保存机制
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

    //    kafkaConsumer.setStartFromEarliest()//
    //    kafkaConsumer.setStartFromTimestamp(1010003794)
    //    kafkaConsumer.setStartFromLatest()
    //    kafkaConsumer.setStartFromSpecificOffsets(Map[KafkaTopicPartition,Long]()

    // 添加source数据源
    val kafkaStream: DataStream[String] = env.addSource(kafkaConsumer)

    kafkaStream.print()

    val sinkStream: DataStream[String] = kafkaStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(5)) {
      override def extractTimestamp(element: String): Long = {
        element.split(",")(1).toLong
      }
    })


    /**
     * 通过FlinkkafkaProduccer API将stream的数据写入到kafka的'sink-topic'中
     */
    //    val brokerList = "localhost:9092"
    val topic = "sink-topic"
    val producerConfig = new Properties()
    producerConfig.put(ProducerConfig.ACKS_CONFIG, new Integer(1)) // 设置producer的ack传输配置
    producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Time.hours(2)) //设置超市时长,默认1小时,建议1个小时以上

    /**
     * 自定义producer,可以通过不同的构造器创建
     */
    val producer: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
      topic,
      new KeyedSerializationSchemaWrapper[String](SimpleStringSchema),
      producerConfig,
      Semantic.EXACTLY_ONCE
    )

    //    FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR
    /** *****************************************************************************************************************
     * * 出了要开启flink的checkpoint功能,同时还要设置相关配置功能。
     * * 因在0.9或者0.10,默认的FlinkKafkaProducer只能保证at-least-once语义,假如需要满足at-least-once语义,我们还需要设置
     * * setLogFailuresOnly(boolean)    默认false
     * * setFlushOnCheckpoint(boolean)  默认true
     * * come from 官网 below:
     * * Besides enabling Flink’s checkpointing,you should also configure the setter methods setLogFailuresOnly(boolean)
     * * and setFlushOnCheckpoint(boolean) appropriately.
     * ******************************************************************************************************************/

    producer.setLogFailuresOnly(false) //默认是false


    /**
     * 除了启用Flink的检查点之外,还可以通过将适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对于Kafka> = 1.0.0版本)
     * 来选择三种不同的操作模式:
     * Semantic.NONE  代表at-mostly-once语义
     * Semantic.AT_LEAST_ONCE(Flink默认设置)
     * Semantic.EXACTLY_ONCE:使用Kafka事务提供一次精确的语义,每当您使用事务写入Kafka时,
     * 请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)
     */

    sinkStream.addSink(producer)

    env.execute("kafka source & sink")
  }
}

上一篇:mariadb服务器断电重启之后Missing MLOG_CHECKPOINT的解决办法


下一篇:24、Checkpoint原理剖析