由于Kafka在消费过程中有可能会遇到断电宕机等故障,Consumer恢复后,需要从上次消费结束的地方接着消费,因此Kafka需要时刻记录Consumer消费到哪个offset,以便故障恢复后接着消费。
Kafka提供两种方法用来维护offset,一种是自动提交,还有一种是手动提交。
1. 在自动消费时,需要开启相关的配置参数
kafka自动提交offset代码如下
package com.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Created by liulei on 2020/7/27. */ public class AutoConsumer { public static void main(String []args){ Properties prop = new Properties(); //broker参数配置 prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //配置自动提交offset prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); //配置自动提交offset的时间间隔 prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); consumer.subscribe(Arrays.asList("first")); while(true){ ConsumerRecords<String,String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value()); } } } }
2.手动提交offset
虽然自动提交offset方式很简捷,但是实际生产过程中,用自动提交方式,开发者很难把握offset提交的时机,因此Kafka另外还提供了手动提交offset的API。
Kafka手动提交offset的方式有两种,一种是commitSync(同步提交),另一种是commitAsync(异步提交)。这两种方式都会将本次poll的一批数据最大的偏移量提交,不同点是commitSync会阻塞当前线程,直到提交成功,并且会自动失败重试(由不可控因素导致,也会导致提交失败),而commitAsync则没有失败重试机制,故有可能提交失败。
显然同步提交方式更可靠,一下为同步提交方式的代码。
package com.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Created by jihn88 on 2020/7/27. */ public class SyncCommit { public static void main(String []args){ Properties prop = new Properties(); //broker参数配置 prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //配置手动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); //消费者订阅主题 consumer.subscribe(Arrays.asList("testAPI")); boolean flag = true; while(flag){ //消费者拉取数据,将数据封装到record对象中 ConsumerRecords<String,String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = &d, key = &s, value = &s%n", record.offset(),record.key(),record.value()); } //同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); } } }
虽然同步提交offset更加可靠,但是由于其会一直阻塞当前线程直至提交成功,因此会严重降低Kafka集群的吞吐量,实际生产环境中,会更多的选用也不提交的方式。
以下是异步提交的示例代码。
package com.consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Map; import java.util.Properties; /** * Created by jihn88 on 2020/7/28. */ public class AsyncCommit { public static void main(String []args){ Properties prop = new Properties(); //broker参数配置 prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //配置手动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); //消费者订阅主题 consumer.subscribe(Arrays.asList("testAPI")); boolean flag = true; while(flag){ //消费者拉取数据,将数据封装到record对象中 ConsumerRecords<String,String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = &d, key = &s, value = &s%n", record.offset(),record.key(),record.value()); } //异步提交 consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception != null){ System.out.println("Commit failed for"+offsets); } } }); } } }
其实无论是同步提交还是异步提交,都有可能造成漏消费或重复消费。先提交offset后消费,可能造成数据的漏消费,先提交offset后消费,有可能造成数据的重复消费。