Kafka 消费者API

       由于Kafka在消费过程中有可能会遇到断电宕机等故障,Consumer恢复后,需要从上次消费结束的地方接着消费,因此Kafka需要时刻记录Consumer消费到哪个offset,以便故障恢复后接着消费。

       Kafka提供两种方法用来维护offset,一种是自动提交,还有一种是手动提交。

    1.  在自动消费时,需要开启相关的配置参数

       kafka自动提交offset代码如下

       

package com.consumer;



import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by liulei  on 2020/7/27.
 */
public class AutoConsumer {
    public static void main(String []args){
        Properties prop = new Properties();
        //broker参数配置
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092");

        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //配置自动提交offset
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

        //配置自动提交offset的时间间隔
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //创建消费者对象
        KafkaConsumer<String,String>  consumer = new KafkaConsumer<String, String>(prop);

        consumer.subscribe(Arrays.asList("first"));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value());
            }
        }

    }
}

2.手动提交offset

虽然自动提交offset方式很简捷,但是实际生产过程中,用自动提交方式,开发者很难把握offset提交的时机,因此Kafka另外还提供了手动提交offset的API。

Kafka手动提交offset的方式有两种,一种是commitSync(同步提交),另一种是commitAsync(异步提交)。这两种方式都会将本次poll的一批数据最大的偏移量提交,不同点是commitSync会阻塞当前线程,直到提交成功,并且会自动失败重试(由不可控因素导致,也会导致提交失败),而commitAsync则没有失败重试机制,故有可能提交失败。

 显然同步提交方式更可靠,一下为同步提交方式的代码。

package com.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by jihn88 on 2020/7/27.
 */
public class SyncCommit {
    public static void main(String []args){
        Properties prop = new Properties();
        //broker参数配置
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092");

        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //配置手动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);

        //消费者订阅主题
        consumer.subscribe(Arrays.asList("testAPI"));

        boolean flag = true;
        while(flag){
            //消费者拉取数据,将数据封装到record对象中
            ConsumerRecords<String,String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = &d, key = &s, value = &s%n",
                        record.offset(),record.key(),record.value());
            }

            //同步提交,当前线程会阻塞直到offset提交成功
            consumer.commitSync();
        }
   }
}

虽然同步提交offset更加可靠,但是由于其会一直阻塞当前线程直至提交成功,因此会严重降低Kafka集群的吞吐量,实际生产环境中,会更多的选用也不提交的方式。

以下是异步提交的示例代码。

package com.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * Created by jihn88 on 2020/7/28.
 */
public class AsyncCommit {
    public static void main(String []args){
        Properties prop = new Properties();
        //broker参数配置
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092");

        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //配置手动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);

        //消费者订阅主题
        consumer.subscribe(Arrays.asList("testAPI"));

        boolean flag = true;
        while(flag){
            //消费者拉取数据,将数据封装到record对象中
            ConsumerRecords<String,String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = &d, key = &s, value = &s%n",
                        record.offset(),record.key(),record.value());
            }

            //异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if(exception != null){
                        System.out.println("Commit failed for"+offsets);
                    }
                }
            });
        }
    }
}

其实无论是同步提交还是异步提交,都有可能造成漏消费或重复消费。先提交offset后消费,可能造成数据的漏消费,先提交offset后消费,有可能造成数据的重复消费。

 

Kafka 消费者API

上一篇:[转载]MySql常用命令总结


下一篇:6.0 DOM API