一、Kafka消费者简介
Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费同一分区。
- 如果消费线程大于 patition 数量,则有些线程将收不到消息;
- 如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;
- 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。
consumer 采用 pull 模式从 broker 中读取数据。
这是因为push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
二、创建Kafka消费者
读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。
四个基本属性
- bootstrap.servers
- key.deserializer
- value.deserializer
- group.id
代码示例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
//订阅主题列表
consumer.subscribe(Collections.singletonList("customerCountries"));
//正则表达式匹配订阅主题,匹配所有test开头的主题
consumer.subscribe("test.*");
//
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}
1)使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。
Tips:
消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应
三、消费者配置
fetch.min.bytes
允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者,对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力
fetch.max.wait.ms
指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms
max.partition.fetch.bytes
指定了每个分区返回的最多字节数,默认为1M。也就是说,每次KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区
max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期
session.timeout.ms
数设置消费者会话过期时间,默认为3秒。如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。
这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,也就是心跳频率,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。
auto.offset.reset
指定了当消费者第一次读取分区或者上一次的位置太落后时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最开始的消息开始消费)
enable.auto.commit
指定了消费者是否自动提交消费位移,默认为true。时间间隔由auto.commit.interval.ms设置
partition.assignment.strategy
当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:
- 范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。
- 轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。
默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),也可以自己实现一个分配策略
client.id
可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用
max.poll.records
控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量
receive.buffer.bytes、send.buffer.bytes
这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大
四、位移提交
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
在第三篇已经分析过位移提交的方式,这里直接进行代码展示:
同步提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
异步提交
直接调用commitAsync
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
基于回调的异步提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
如何想进行重试同时又保证提交顺序?
使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)
混合提交
第三篇提到的优雅提交方式,使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
提交特定位移
commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理
commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
} }
代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交),注意,提交的是当前已处理位移的下一条,即代码中的record.offset()+1。
重平衡前同步提交
在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:
- public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。
- public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。
从指定位移开始消费
在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费
- 从分区开始端重新开始消费,使用seekToBeginning(TopicPartition tp)
- 从分区的最末端消费最新的消息,使用seekToEnd(TopicPartition tp)
场景:
从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
问题:
在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理
解决方案:
在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费
代码实现:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
//在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
//保存记录结果
storeRecordInDB(record);
//保存位移
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
//提交数据库事务,保存消费的记录以及位移
commitDBTransaction();
}
seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。
优雅退出
在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException
如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)
代码示例:
//注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
//调用消费者的wakeup方法通知主线程退出
consumer.wakeup();
try {
//等待主线程退出
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "-- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.position(tp));
consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
五、单个消费者指定分区消费
一般情况下我们都是使用消费组(即便只有一个消费者)来消费消息的,因为这样可以在增加或减少消费者时自动进行分区重平衡。
在知道主题和分区的情况下,我们也可以使用单个消费者来进行消费。对于这种情况,我们需要自己给消费者分配消费分区,而不是让消费者订阅(成为消费组)主题
指定分区进行消费代码展示:
List<PartitionInfo> partitionInfos = null;
//获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
//为消费者指定分区
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。
参考
Kafka权威实战