11.2.7、flink核心_一次消费数据,唯一一次Exactly once,生产端的实现,开启事务,幂等性,ack机制

1、kafka生产端

唯一一次Exactly once
往kafka里写数据时保证只有一条数据
11.2.7、flink核心_一次消费数据,唯一一次Exactly once,生产端的实现,开启事务,幂等性,ack机制

(1)幂等性:kafka 0.11之后,Producer的send操作现在是幂等的,在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次
(2)事务
(3)kafka ack机制+副本(生产环境一般2个副本)
—ack设置为0:生产者发送完就 完成了,不管成功不成功
—ack设置为1:一个分区收到数据后返回成功
—ack设置为all或-1:所有的分区收到数据后返回完成,topic多副本不会丢失数据
11.2.7、flink核心_一次消费数据,唯一一次Exactly once,生产端的实现,开启事务,幂等性,ack机制

2、代码实现事务(生产端)

–product.send 发送数据时数据已经在kafka中了,这时数据是一个未提交的状态
—所以设置kafka消费数据类型为提交的数据,就可以保证事务了
—kafka读取已提交的数据参数 --isolation-level read_committed

object Demo04Trans {
  def main(args: Array[String]): Unit = {
    /**
      * 创建kafka的连接
      */
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "ghtf")
    properties.setProperty("transactional.id", "ghfdgtf")

    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //生产者
    val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)

    /**
      * 事务
      */
    //初始化事务
    producer.initTransactions()
    //开始事务
    producer.beginTransaction()
    //---product.send 发送数据时数据已经在kafka中了,这时数据是一个未提交的状态
    //---所以设置kafka消费数据类型为提交的数据,就可以保证事务了
    producer.send(new ProducerRecord[String, String]("trans", "java"))
    Thread.sleep(5000)
    producer.send(new ProducerRecord[String, String]("trans", "spark"))

    //提交事务
    producer.commitTransaction()
    //数据刷写
    producer.flush()
    producer.close()
    //kafka读取提交的数据参数  --isolation-level read_committed
    // kafka-console-consumer.sh --bootstrap-server  master:9092 --isolation-level read_committed --from-beginning --topic trans
  }
}
上一篇:Flink 是如何统一批流引擎的


下一篇:一文了解Flink State Backends