Kafka学习笔记(十二)—Kafka消费者API

一、Kafka消费者简介

Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费同一分区。

  • 如果消费线程大于 patition 数量,则有些线程将收不到消息;
  • 如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;
  • 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。

consumer 采用 pull 模式从 broker 中读取数据。

这是因为push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

二、创建Kafka消费者

读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。
四个基本属性

  • bootstrap.servers
  • key.deserializer
  • value.deserializer
  • group.id

代码示例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
//订阅主题列表
consumer.subscribe(Collections.singletonList("customerCountries"));
//正则表达式匹配订阅主题,匹配所有test开头的主题
consumer.subscribe("test.*");

//
try {
   while (true) {  //1)
       ConsumerRecords<String, String> records = consumer.poll(100); //2) 			  
       for (ConsumerRecord<String, String> record : records)  //3)
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSONObject json = new JSONObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //4
}

1)使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

Tips:
消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应

三、消费者配置

fetch.min.bytes

允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者,对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力

fetch.max.wait.ms

指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms

max.partition.fetch.bytes

指定了每个分区返回的最多字节数,默认为1M。也就是说,每次KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区

max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期

session.timeout.ms

数设置消费者会话过期时间,默认为3秒。如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。

这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,也就是心跳频率,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。

auto.offset.reset

指定了当消费者第一次读取分区或者上一次的位置太落后时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最开始的消息开始消费)

enable.auto.commit

指定了消费者是否自动提交消费位移,默认为true。时间间隔由auto.commit.interval.ms设置

partition.assignment.strategy

当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:

  • 范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。
  • 轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。

默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),也可以自己实现一个分配策略

client.id

可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用

max.poll.records

控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量

receive.buffer.bytes、send.buffer.bytes

这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大

四、位移提交

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

在第三篇已经分析过位移提交的方式,这里直接进行代码展示:

同步提交

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

异步提交

直接调用commitAsync

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    
    consumer.commitAsync();
}

基于回调的异步提交

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}

如何想进行重试同时又保证提交顺序?
使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)

混合提交

第三篇提到的优雅提交方式,使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功

try {
    while (true) {
       ConsumerRecords<String, String> records = consumer.poll(100);
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %s\n",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }
       
       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

提交特定位移

commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理

commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }

代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交),注意,提交的是当前已处理位移的下一条,即代码中的record.offset()+1。

重平衡前同步提交

在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:

  • public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。
  • public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }
    
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
   log.error("Unexpected error", e);
} finally {
   try {
       consumer.commitSync(currentOffsets);
   } finally {
       consumer.close();
       System.out.println("Closed consumer and we are done");
   }
}

代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。

从指定位移开始消费

在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费

  • 从分区开始端重新开始消费,使用seekToBeginning(TopicPartition tp)
  • 从分区的最末端消费最新的消息,使用seekToEnd(TopicPartition tp)

场景:
从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        processRecord(record);
        storeRecordInDB(record);
        consumer.commitAsync(currentOffsets);
    }
}

问题:
在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理

解决方案:
在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费

代码实现:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
        commitDBTransaction();
    }
    
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
    //在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
    consumer.poll(0);
    for (TopicPartition partition: consumer.assignment())
        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            //保存记录结果
            storeRecordInDB(record);
            //保存位移
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        //提交数据库事务,保存消费的记录以及位移
        commitDBTransaction();
    }

seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。

优雅退出

在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException

如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)

代码示例:

//注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
Runtime.getRuntime().addShutdownHook(new Thread() {
          public void run() {
              System.out.println("Starting exit...");
              //调用消费者的wakeup方法通知主线程退出
              consumer.wakeup();
              try {
                  //等待主线程退出
                  mainThread.join();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          } 
});

...

try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        System.out.println(System.currentTimeMillis() + "--  waiting for data...");
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
        }
        for (TopicPartition tp: consumer.assignment())
            System.out.println("Committing offset at position:" + consumer.position(tp));
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
    System.out.println("Closed consumer and we are done");
}

五、单个消费者指定分区消费

一般情况下我们都是使用消费组(即便只有一个消费者)来消费消息的,因为这样可以在增加或减少消费者时自动进行分区重平衡。

在知道主题和分区的情况下,我们也可以使用单个消费者来进行消费。对于这种情况,我们需要自己给消费者分配消费分区,而不是让消费者订阅(成为消费组)主题

指定分区进行消费代码展示:

List<PartitionInfo> partitionInfos = null;
//获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    //为消费者指定分区
    consumer.assign(partitions);

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}

除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。

参考

Kafka权威实战

上一篇:最近面试的几个问题 (三) io 多路复用


下一篇:python学习42——IO模型