1、kafka生产端
唯一一次Exactly once
往kafka里写数据时保证只有一条数据
(1)幂等性:kafka 0.11之后,Producer的send操作现在是幂等的,在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次
(2)事务
(3)kafka ack机制+副本(生产环境一般2个副本)
—ack设置为0:生产者发送完就 完成了,不管成功不成功
—ack设置为1:一个分区收到数据后返回成功
—ack设置为all或-1:所有的分区收到数据后返回完成,topic多副本不会丢失数据
2、代码实现事务(生产端)
–product.send 发送数据时数据已经在kafka中了,这时数据是一个未提交的状态
—所以设置kafka消费数据类型为提交的数据,就可以保证事务了
—kafka读取已提交的数据参数 --isolation-level read_committed
object Demo04Trans {
def main(args: Array[String]): Unit = {
/**
* 创建kafka的连接
*/
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "ghtf")
properties.setProperty("transactional.id", "ghfdgtf")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)
/**
* 事务
*/
//初始化事务
producer.initTransactions()
//开始事务
producer.beginTransaction()
//---product.send 发送数据时数据已经在kafka中了,这时数据是一个未提交的状态
//---所以设置kafka消费数据类型为提交的数据,就可以保证事务了
producer.send(new ProducerRecord[String, String]("trans", "java"))
Thread.sleep(5000)
producer.send(new ProducerRecord[String, String]("trans", "spark"))
//提交事务
producer.commitTransaction()
//数据刷写
producer.flush()
producer.close()
//kafka读取提交的数据参数 --isolation-level read_committed
// kafka-console-consumer.sh --bootstrap-server master:9092 --isolation-level read_committed --from-beginning --topic trans
}
}