Kafka多线程消费

本文简单介绍下如何使用多线程消费kafka

注: 以下示例采用Kafka版本2.2

消费者配置

消费者从Kafka读取消息,需要考虑以下消费者配置。

参数 说明
max.poll.records(default=500) 消费者单次获取的最大消息条数
fetch.max.bytes(default=52428800) 服务器应为获取请求返回的最大数据量。记录由消费者分批获取,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍会返回该记录批次,以确保消费者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过message.max.bytes(broker config) 或max.message.bytes(topic config) 定义的。请注意,消费者并行执行多次提取。
session.timeout.ms(default=10000) 消费者定期向broker发送心跳,如果在此会话超时到期之前broker没有收到心跳,则broker将从组中删除消费者,并启动重新平衡
max.poll.interval.ms(default=300000) 消费者两次调用poll()之间的最大延迟,如果超过这个时间,则broker将从组中删除消费者并启动重新平衡
heartbeat.interval.ms(default=3000) 定义消费者发送心跳的频率

Rebalance

什么情况会触发重新平衡?

Kafka在管理消费者组时,只要消费者组成员发生变化或消费者组订阅发生变化,就会触发分区重新平衡。比如:

  • topic 添加了新的分区

  • 一个消费者加入一个组:部署了新的程序,使用了一样的groupId

  • 一个消费者离开一个组:

    • max.poll.interval.ms 超时,未及时处理轮询记录
    • session.timeout.ms超时,由于应用程序崩溃或者网络错误,没有发送心跳
    • 消费者关闭,服务停掉

重新平衡该怎么做?

如果是启用自动偏移提交,您不必担心组重新平衡,一切都由 poll 方法自动完成。但是,如果您禁用自动偏移提交并手动提交,你需要在发送组请求之前提交偏移量。您可以通过两种方式执行此操作:

  1. 在处理完一批消息后执行commitSync()

       while (true) {
    	   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    	   for (ConsumerRecord<String, String> record : records) {
    		   buffer.add(record);
    	   }
    	   if (buffer.size() >= minBatchSize) {
    		   insertIntoDb(buffer);
    		   // 在入库之后,提交offset之前失败的话,也会导致重复消费
    		   consumer.commitSync();
    		   buffer.clear();
    	   }
      }
    
  2. 实现ConsumerRebalanceListener,在分区即将被撤销时得到通知,并在此时提交相应的offset。

    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    private Consumer<?,?> consumer;
    
    public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
       this.consumer = consumer;
    }
       
    @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions){
      // 提交offset
      for(TopicPartition partition: partitions)
       saveOffsetInExternalStore(consumer.position(partition));
      }
    
    }
    

第一种方法更简单,但处理速度非常快的情况可能会导致偏移提交过于频繁。第二种方法更有效,并且对于完全分离的消费和处理是必要的。

示例代码

思路:主线程拉取一批Kafka消息,以分区为最小粒度创建任务,交给线程池处理,每个任务处理一个分区的数据,主线程轮询任务消费情况,提交offset。
Kafka多线程消费

创建任务

package com.mirai.boot.kafka;  
  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
  
import java.util.List;  
import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.atomic.AtomicLong;  
import java.util.concurrent.locks.ReentrantLock;  
  
/**  
 * @author mirai  
 * @version 2021/9/23  
 * @since 1.8  
 */
@Slf4j  
public class MyConsumerTask implements Runnable {  
    private final List<ConsumerRecord<String, String>> records;  
  
 private volatile boolean stopped = false;  
  
 private volatile boolean started = false;  
  
 private volatile boolean finished = false;  
  
 private final CompletableFuture<Long> completion = new CompletableFuture<>();  
  
 private final ReentrantLock startStopLock = new ReentrantLock();  
  
 private final AtomicLong currentOffset = new AtomicLong();  
  
 public MyConsumerTask(List<ConsumerRecord<String, String>> records) {  
        this.records = records;  
 }  
  
    @Override  
 public void run() {  
        startStopLock.lock();  
 try {  
            if (stopped) {  
                return;  
 }  
            started = true;  
 } finally {  
            startStopLock.unlock();  
 }  
  
        for (ConsumerRecord<String, String> record : records) {  
            if (stopped) {  
                break;  
 }  
            // process record here and make sure you catch all exceptions;  
 currentOffset.set(record.offset() + 1);  
 }  
        finished = true;  
 completion.complete(currentOffset.get());  
 }  
  
    public long getCurrentOffset() {  
        return currentOffset.get();  
 }  
  
    public void stop() {  
        startStopLock.lock();  
 try {  
            this.stopped = true;  
 if (!started) {  
                finished = true;  
 completion.complete(currentOffset.get());  
 }  
        } finally {  
            startStopLock.unlock();  
 }  
    }  
  
    public long waitForCompletion() {  
        try {  
            return completion.get();  
 } catch (InterruptedException | ExecutionException e) {  
            return -1;  
 }  
    }  
  
    public boolean isFinished() {  
        return finished;  
 }  
}

实现 ConsumerRebalanceListener

package com.mirai.boot.kafka.demo;  
  
import lombok.AllArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.clients.consumer.OffsetAndMetadata;  
import org.apache.kafka.common.TopicPartition;  
  
import java.util.Collection;  
import java.util.HashMap;  
import java.util.Map;  
  
/**  
 * @author mirai  
 */
@Slf4j  
@AllArgsConstructor  
public class MultiThreadedRebalancedListener implements ConsumerRebalanceListener {  
    private final KafkaConsumer<String, String> consumer;  
 private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks;  
 private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;  
  
 @Override  
 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {  
  
        // 1. Stop all tasks handling records from revoked partitions  
 Map<TopicPartition, MyConsumerTask<String, String>> stoppedTask = new HashMap<>();  
 for (TopicPartition partition : partitions) {  
            MyConsumerTask<String, String> task = activeTasks.remove(partition);  
 if (task != null) {  
                task.stop();  
 stoppedTask.put(partition, task);  
 }  
        }  
  
        // 2. Wait for stopped tasks to complete processing of current record  
 stoppedTask.forEach((partition, task) -> {  
            long offset = task.waitForCompletion();  
 if (offset > 0) {  
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));  
 }  
        });  
  
 // 3. collect offsets for revoked partitions  
 Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();  
 partitions.forEach(partition -> {  
            OffsetAndMetadata offset = offsetsToCommit.remove(partition);  
 if (offset != null) {  
                revokedPartitionOffsets.put(partition, offset);  
 }  
        });  
  
 // 4. commit offsets for revoked partitions  
 try {  
            consumer.commitSync(revokedPartitionOffsets);  
 } catch (Exception e) {  
            log.warn("Failed to commit offsets for revoked partitions!");  
 }  
    }  
  
    @Override  
 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {  
        consumer.resume(partitions);  
 }  
  
}

多线程消费

package com.mirai.boot.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author mirai
 * @version 2021/9/23
 * @since 1.8
 */
@Slf4j
public class MyMultiTreadConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks = new HashMap<>();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private long lastCommitTime = System.currentTimeMillis();

    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            0L
            , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            },
            new ThreadPoolExecutor.DiscardPolicy()
    );

    public MyMultiTreadConsumer(Properties properties, String topic) {
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic)
                , new MultiThreadedRebalancedListener(consumer, activeTasks, offsetsToCommit));
    }

    @Override
    public void run() {
        try {
            while (!stopped.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                handleFetchedRecords(records);
                checkActiveTasks();
                commitOffsets();
            }
        } catch (WakeupException we) {
            if (!stopped.get()) {
                throw we;
            }
        } finally {
            consumer.close();
        }
    }

    private void handleFetchedRecords(ConsumerRecords<String, String> records) {
        if (records.count() > 0) {
            List<TopicPartition> partitionsToPause = new ArrayList<>();
            records.partitions().forEach(partition -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                MyConsumerTask<String, String> task = new MyConsumerTask<>(partitionRecords);
                partitionsToPause.add(partition);
                EXECUTOR.submit(task);
                activeTasks.put(partition, task);
            });
            consumer.pause(partitionsToPause);
        }
    }

    private void checkActiveTasks() {
        List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
        activeTasks.forEach((partition, task) -> {
            if (task.isFinished()) {
                finishedTasksPartitions.add(partition);
            }
            long offset = task.getCurrentOffset();
            if (offset > 0) {
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
            }
        });
        finishedTasksPartitions.forEach(activeTasks::remove);
        consumer.resume(finishedTasksPartitions);
    }

    private void commitOffsets() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - lastCommitTime > 5000) {
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit);
                    offsetsToCommit.clear();
                }
                lastCommitTime = currentTimeMillis;
            }
        } catch (Exception e) {
            log.error("Failed to commit offsets!", e);
        }
    }
}

参考连接

上一篇:Kafka的架构


下一篇:apisix基于username和password的JWT验证插件