本文简单介绍下如何使用多线程消费kafka
注: 以下示例采用Kafka版本2.2
消费者配置
消费者从Kafka读取消息,需要考虑以下消费者配置。
参数 | 说明 |
---|---|
max.poll.records(default=500) | 消费者单次获取的最大消息条数 |
fetch.max.bytes(default=52428800) | 服务器应为获取请求返回的最大数据量。记录由消费者分批获取,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍会返回该记录批次,以确保消费者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过message.max.bytes (broker config) 或max.message.bytes (topic config) 定义的。请注意,消费者并行执行多次提取。 |
session.timeout.ms(default=10000) | 消费者定期向broker发送心跳,如果在此会话超时到期之前broker没有收到心跳,则broker将从组中删除消费者,并启动重新平衡 |
max.poll.interval.ms(default=300000) | 消费者两次调用poll()之间的最大延迟,如果超过这个时间,则broker将从组中删除消费者并启动重新平衡 |
heartbeat.interval.ms(default=3000) | 定义消费者发送心跳的频率 |
Rebalance
什么情况会触发重新平衡?
Kafka在管理消费者组时,只要消费者组成员发生变化或消费者组订阅发生变化,就会触发分区重新平衡。比如:
-
topic 添加了新的分区
-
一个消费者加入一个组:部署了新的程序,使用了一样的groupId
-
一个消费者离开一个组:
-
max.poll.interval.ms
超时,未及时处理轮询记录 -
session.timeout.ms
超时,由于应用程序崩溃或者网络错误,没有发送心跳 - 消费者关闭,服务停掉
-
重新平衡该怎么做?
如果是启用自动偏移提交,您不必担心组重新平衡,一切都由 poll 方法自动完成。但是,如果您禁用自动偏移提交并手动提交,你需要在发送组请求之前提交偏移量。您可以通过两种方式执行此操作:
-
在处理完一批消息后执行
commitSync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 在入库之后,提交offset之前失败的话,也会导致重复消费 consumer.commitSync(); buffer.clear(); } }
-
实现
ConsumerRebalanceListener
,在分区即将被撤销时得到通知,并在此时提交相应的offset。public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { private Consumer<?,?> consumer; public SaveOffsetsOnRebalance(Consumer<?,?> consumer) { this.consumer = consumer; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions){ // 提交offset for(TopicPartition partition: partitions) saveOffsetInExternalStore(consumer.position(partition)); } }
第一种方法更简单,但处理速度非常快的情况可能会导致偏移提交过于频繁。第二种方法更有效,并且对于完全分离的消费和处理是必要的。
示例代码
思路:主线程拉取一批Kafka消息,以分区为最小粒度创建任务,交给线程池处理,每个任务处理一个分区的数据,主线程轮询任务消费情况,提交offset。
创建任务
package com.mirai.boot.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author mirai
* @version 2021/9/23
* @since 1.8
*/
@Slf4j
public class MyConsumerTask implements Runnable {
private final List<ConsumerRecord<String, String>> records;
private volatile boolean stopped = false;
private volatile boolean started = false;
private volatile boolean finished = false;
private final CompletableFuture<Long> completion = new CompletableFuture<>();
private final ReentrantLock startStopLock = new ReentrantLock();
private final AtomicLong currentOffset = new AtomicLong();
public MyConsumerTask(List<ConsumerRecord<String, String>> records) {
this.records = records;
}
@Override
public void run() {
startStopLock.lock();
try {
if (stopped) {
return;
}
started = true;
} finally {
startStopLock.unlock();
}
for (ConsumerRecord<String, String> record : records) {
if (stopped) {
break;
}
// process record here and make sure you catch all exceptions;
currentOffset.set(record.offset() + 1);
}
finished = true;
completion.complete(currentOffset.get());
}
public long getCurrentOffset() {
return currentOffset.get();
}
public void stop() {
startStopLock.lock();
try {
this.stopped = true;
if (!started) {
finished = true;
completion.complete(currentOffset.get());
}
} finally {
startStopLock.unlock();
}
}
public long waitForCompletion() {
try {
return completion.get();
} catch (InterruptedException | ExecutionException e) {
return -1;
}
}
public boolean isFinished() {
return finished;
}
}
实现 ConsumerRebalanceListener
package com.mirai.boot.kafka.demo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* @author mirai
*/
@Slf4j
@AllArgsConstructor
public class MultiThreadedRebalancedListener implements ConsumerRebalanceListener {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks;
private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 1. Stop all tasks handling records from revoked partitions
Map<TopicPartition, MyConsumerTask<String, String>> stoppedTask = new HashMap<>();
for (TopicPartition partition : partitions) {
MyConsumerTask<String, String> task = activeTasks.remove(partition);
if (task != null) {
task.stop();
stoppedTask.put(partition, task);
}
}
// 2. Wait for stopped tasks to complete processing of current record
stoppedTask.forEach((partition, task) -> {
long offset = task.waitForCompletion();
if (offset > 0) {
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
}
});
// 3. collect offsets for revoked partitions
Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();
partitions.forEach(partition -> {
OffsetAndMetadata offset = offsetsToCommit.remove(partition);
if (offset != null) {
revokedPartitionOffsets.put(partition, offset);
}
});
// 4. commit offsets for revoked partitions
try {
consumer.commitSync(revokedPartitionOffsets);
} catch (Exception e) {
log.warn("Failed to commit offsets for revoked partitions!");
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
}
}
多线程消费
package com.mirai.boot.kafka.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author mirai
* @version 2021/9/23
* @since 1.8
*/
@Slf4j
public class MyMultiTreadConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
private final AtomicBoolean stopped = new AtomicBoolean(false);
private long lastCommitTime = System.currentTimeMillis();
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L
, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.DiscardPolicy()
);
public MyMultiTreadConsumer(Properties properties, String topic) {
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic)
, new MultiThreadedRebalancedListener(consumer, activeTasks, offsetsToCommit));
}
@Override
public void run() {
try {
while (!stopped.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
handleFetchedRecords(records);
checkActiveTasks();
commitOffsets();
}
} catch (WakeupException we) {
if (!stopped.get()) {
throw we;
}
} finally {
consumer.close();
}
}
private void handleFetchedRecords(ConsumerRecords<String, String> records) {
if (records.count() > 0) {
List<TopicPartition> partitionsToPause = new ArrayList<>();
records.partitions().forEach(partition -> {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
MyConsumerTask<String, String> task = new MyConsumerTask<>(partitionRecords);
partitionsToPause.add(partition);
EXECUTOR.submit(task);
activeTasks.put(partition, task);
});
consumer.pause(partitionsToPause);
}
}
private void checkActiveTasks() {
List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
activeTasks.forEach((partition, task) -> {
if (task.isFinished()) {
finishedTasksPartitions.add(partition);
}
long offset = task.getCurrentOffset();
if (offset > 0) {
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
}
});
finishedTasksPartitions.forEach(activeTasks::remove);
consumer.resume(finishedTasksPartitions);
}
private void commitOffsets() {
try {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastCommitTime > 5000) {
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
offsetsToCommit.clear();
}
lastCommitTime = currentTimeMillis;
}
} catch (Exception e) {
log.error("Failed to commit offsets!", e);
}
}
}
参考连接
- https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
- https://medium.com/bakdata/solving-my-weird-kafka-rebalancing-problems-c05e99535435