1、单独KafkaConsumer实例and多worker线程。
将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。
本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
1 package com.bie.kafka.kafkaWorker; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Collection; 6 import java.util.Collections; 7 import java.util.HashMap; 8 import java.util.Map; 9 import java.util.Properties; 10 import java.util.concurrent.ArrayBlockingQueue; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.ThreadPoolExecutor; 13 import java.util.concurrent.TimeUnit; 14 15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 16 import org.apache.kafka.clients.consumer.ConsumerRecords; 17 import org.apache.kafka.clients.consumer.KafkaConsumer; 18 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 19 import org.apache.kafka.common.TopicPartition; 20 import org.apache.kafka.common.errors.WakeupException; 21 22 /** 23 * 24 * @Description TODO 25 * @author biehl 26 * @Date 2019年6月1日 下午3:28:53 27 * 28 * @param <K> 29 * @param <V> 30 * 31 * 1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。 32 * 33 */ 34 public class ConsumerThreadHandler<K, V> { 35 36 // KafkaConsumer实例 37 private final KafkaConsumer<K, V> consumer; 38 // ExecutorService实例 39 private ExecutorService executors; 40 // 位移信息offsets 41 private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); 42 43 /** 44 * 45 * @param brokerList 46 * kafka列表 47 * @param groupId 48 * 消费组groupId 49 * @param topic 50 * 主题topic 51 */ 52 public ConsumerThreadHandler(String brokerList, String groupId, String topic) { 53 Properties props = new Properties(); 54 // broker列表 55 props.put("bootstrap.servers", brokerList); 56 // 消费者组编号Id 57 props.put("group.id", groupId); 58 // 非自动提交位移信息 59 props.put("enable.auto.commit", "false"); 60 // 从最早的位移处开始消费消息 61 props.put("auto.offset.reset", "earliest"); 62 // key反序列化 63 props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 64 // value反序列化 65 props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 66 // 将配置信息装配到消费者实例里面 67 consumer = new KafkaConsumer<>(props); 68 // 消费者订阅消息,并实现重平衡rebalance 69 // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。 70 // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。 71 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { 72 73 /** 74 * 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。 75 */ 76 @Override 77 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 78 // 提交位移 79 consumer.commitSync(offsets); 80 } 81 82 /** 83 * rebalance完成后会调用onPartitionsAssigned方法。 84 */ 85 @Override 86 public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 87 // 清除位移信息 88 offsets.clear(); 89 } 90 }); 91 } 92 93 /** 94 * 消费主方法 95 * 96 * @param threadNumber 97 * 线程池中的线程数 98 */ 99 public void consume(int threadNumber) { 100 executors = new ThreadPoolExecutor( 101 threadNumber, 102 threadNumber, 103 0L, 104 TimeUnit.MILLISECONDS, 105 new ArrayBlockingQueue<Runnable>(1000), 106 new ThreadPoolExecutor.CallerRunsPolicy()); 107 try { 108 // 消费者一直处于等待状态,等待消息消费 109 while (true) { 110 // 从主题中获取消息 111 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L)); 112 // 如果获取到的消息不为空 113 if (!records.isEmpty()) { 114 // 将消息信息、位移信息封装到ConsumerWorker中进行提交 115 executors.submit(new ConsumerWorker<>(records, offsets)); 116 } 117 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间 118 this.commitOffsets(); 119 } 120 } catch (WakeupException e) { 121 // 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常 122 //如果不忽略异常信息,此处会打印错误哦,亲 123 //e.printStackTrace(); 124 } finally { 125 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间 126 this.commitOffsets(); 127 // 关闭consumer 128 consumer.close(); 129 } 130 } 131 132 /** 133 * 尽量降低synchronized块对offsets锁定的时间 134 */ 135 private void commitOffsets() { 136 // 尽量降低synchronized块对offsets锁定的时间 137 Map<TopicPartition, OffsetAndMetadata> unmodfiedMap; 138 // 保证线程安全、同步锁,锁住offsets 139 synchronized (offsets) { 140 // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间 141 if (offsets.isEmpty()) { 142 return; 143 } 144 // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步 145 unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets)); 146 // 清除位移信息offsets 147 offsets.clear(); 148 } 149 // 将封装好的位移信息unmodfiedMap集合进行同步提交 150 // 手动提交位移信息 151 consumer.commitSync(unmodfiedMap); 152 } 153 154 /** 155 * 关闭消费者 156 */ 157 public void close() { 158 // 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。 159 // KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。 160 // wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用 161 consumer.wakeup(); 162 // 关闭ExecutorService实例 163 executors.shutdown(); 164 } 165 166 }
1 package com.bie.kafka.kafkaWorker; 2 3 import java.util.List; 4 import java.util.Map; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 9 import org.apache.kafka.common.TopicPartition; 10 11 /** 12 * 13 * @Description TODO 14 * @author biehl 15 * @Date 2019年6月1日 下午3:45:38 16 * 17 * @param <K> 18 * @param <V> 19 * 20 * 1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。 21 * 22 */ 23 public class ConsumerWorker<K, V> implements Runnable { 24 25 // 获取到的消息 26 private final ConsumerRecords<K, V> records; 27 // 位移信息 28 private final Map<TopicPartition, OffsetAndMetadata> offsets; 29 30 /** 31 * ConsumerWorker有参构造方法 32 * 33 * @param records 34 * 获取到的消息 35 * @param offsets 36 * 位移信息 37 */ 38 public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) { 39 this.records = records; 40 this.offsets = offsets; 41 } 42 43 /** 44 * 45 */ 46 @Override 47 public void run() { 48 // 获取到分区的信息 49 for (TopicPartition partition : records.partitions()) { 50 // 获取到分区的消息记录 51 List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition); 52 // 遍历获取到的消息记录 53 for (ConsumerRecord<K, V> record : partConsumerRecords) { 54 // 打印消息 55 System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: " 56 + record.offset() 57 + ",消息记录: " + record.value()); 58 } 59 // 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置 60 long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset(); 61 // 同步锁,锁住offsets位移 62 synchronized (offsets) { 63 // 如果offsets位移不包含partition这个key信息 64 if (!offsets.containsKey(partition)) { 65 // 就将位移信息设置到map集合里面 66 offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); 67 } else { 68 // 否则,offsets位移包含partition这个key信息 69 // 获取到offsets的位置信息 70 long curr = offsets.get(partition).offset(); 71 // 如果获取到的位置信息小于等于上一次位移信息大小 72 if (curr <= lastOffset + 1) { 73 // 将这个partition的位置信息设置到map集合中。并保存到broker中。 74 offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); 75 } 76 } 77 } 78 } 79 } 80 81 }
1 package com.bie.kafka.kafkaWorker; 2 3 /** 4 * 5 * @Description TODO 6 * @author biehl 7 * @Date 2019年6月1日 下午4:13:25 8 * 9 * 1、单独KafkaConsumer实例和多worker线程。 10 * 2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中, 11 * 同时维护一个或者若各干consumer实例执行消息获取任务。 12 * 3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作, 13 * 之后worker线程完成处理后上报位移状态,由全局consumer提交位移。 14 * 15 * 16 */ 17 18 public class ConsumerMain { 19 20 public static void main(String[] args) { 21 // broker列表 22 String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092"; 23 // 主题信息topic 24 String topic = "topic1"; 25 // 消费者组信息group 26 String groupId = "group2"; 27 // 根据ConsumerThreadHandler构造方法构造出消费者 28 final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic); 29 final int cpuCount = Runtime.getRuntime().availableProcessors(); 30 System.out.println("cpuCount : " + cpuCount); 31 // 创建线程的匿名内部类 32 Runnable runnable = new Runnable() { 33 34 @Override 35 public void run() { 36 // 执行consume,在此线程中执行消费者消费消息。 37 handler.consume(cpuCount); 38 } 39 }; 40 // 直接调用runnable此线程,并运行 41 new Thread(runnable).start(); 42 43 try { 44 // 此线程休眠20000 45 Thread.sleep(20000L); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 System.out.println("Starting to close the consumer..."); 50 // 关闭消费者 51 handler.close(); 52 } 53 54 }
待续......