上一篇介绍了如何使用Kafka的生产者,这一篇将介绍在实际生产中如何合理地使用Kafka的消费者API。
Kafka中消费者API分为新版和旧版,本章只介绍新版,旧版的就不做介绍了。
首发于我的个人博客:http://www.janti.cn/article/kafkaconsumer
准备工作
kafka版本:2.11-1.1.1
操作系统:centos7
java:jdk1.8
有了以上这些条件就OK了,具体怎么安装和启动Kafka这里就不强调了,可以看上一篇文章。
新建一个maven工程,需要的依赖如下:
<dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka_2.11artifactId> <version>1.1.1version> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>1.1.1version> dependency>
简单的消费者例子
Kafka中是封装了KafkaConsumer类,消息接受都是通过该类来进行的。与生产者一样,在实例化消费者之前都是要进行配置的。
首先介绍这里面的配置:- bootstrap.servers:配置连接代理列表,不必包含Kafka集群的所有代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能够成功连上Kafka集群,在多代理集群的情况下,建议至少配置两个代理。(由于电脑配置有限,本文实验的是单机情况)
- key.deserializer : 用于反序列化消息Key的类
- value.deserializer :用于反序列化消息值(Value)的类
- group.id:指定消费者所在的组
- client.id:指定客户端所在组的ID
- enable.auto.commit:设置是否自动提交。在没有指定消费偏移量提交方式时,默认是每隔1S发送一次
- auto.commit.interval.ms:自动提交偏移值的时间间
package kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * 简单的消费者 * * @author tangj */ public class KafkaSimpleDemo { static Properties properties = new Properties(); private static String topic = "MyOrder"; //poll超时时间 private static long pollTimeout = 2000; // 1.消费者配置 static { // bootstarp server 地址 properties.put("bootstrap.servers", "10.0.90.53:9092"); // group.id指定消费者,所在的组 properties.put("group.id", "order"); // 组中client 的ID名称 properties.put("client.id", "consumer"); // 在没有指定消费偏移量提交方式时,默认是每个1s提交一次偏移量,可以通过auto.commit.interval.ms参数指定提交间隔 // 自动提交要设置成true // 手动提交设置成false properties.put("enable.auto.commit", true); // 自动提交偏移值的 时间间隔 properties.put("auto.commit.interval.ms", 2000); // key序列化 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value序列化 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String args[]) { consumeAutoCommit(); } /** * 消费消息自动提交偏移值 */ public static void consumeAutoCommit() { //2\. 实例化消费者 KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); // 3.订阅主题 kafkaConsumer.subscribe(Arrays.asList(topic)); try { // 消费者是一个长期的过程,所以使用永久循环, while (true) { // 4.拉取消息 ConsumerRecords records = kafkaConsumer.poll(pollTimeout); for (ConsumerRecord record : records) { System.out.println("消息总数为: " + records.count()); System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n", record.partition(), record.offset(), record.key(), record.value())); } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }看了简单的消费者模式,来看看消费者中的其他知识:消费模型,分区均衡,偏移量的提交,多线程消费者。
消费模型
可以看到在配置的时候,配置了消费者组这个变量。这里主要讲讲消费模型。消费模型主要分为两种:消费者组模型,发布订阅模型 消费者组中有多个消费者,组内的消费者共享一个group.id,并且有一个单独的client.id。消费者组用来消费一个主题下的所有分区,但是每个分区只能由该组内的一个消费者消费,不会被重复消费。 发布订阅模型就是所有的消费者都可以通过订阅来获取Kafka中的消息。分区再平衡
介绍一种情况,消费者并不是越多越好的,当消费者大于分区数时,就会有部分消费者一直空闲着。 分区再平衡,即:parition rebanlance。总的来说分区再平衡就是一个消费者原来消费的分区变成由其他消费者消费,它只发生在消费者组中。 再均衡的作用就是为了保证消费者组的高可用和伸缩性,但是再均衡期间消费者会无法读取消息,有短暂的暂停时间。偏移量的处理
偏移量的左右就是记录已经消费的消息。之前的一个简单的demo演示了自动提交的方式,接下来介绍手动提交。 在实际生产中,消费者拉取到消息之后会进行一些业务处理,比如存到数据库,写入缓存,网络请求等,这些都会有失败的可能,所以要对偏移值进行更精细的控制。 手动提交有两种方式:第一种是同步提交,同步提交是阻塞的,对于提交失败的处理,它会一直提交,直到提交成功。 第二种是异步提交,异步提交是非阻塞的,对于提交失败,也不会重新提交。 当然,对于手动提交的业务设计,还是要结合具体业务进行考虑和设计。手动提交之后,需要设置enable.auto.commit为false.并且不需要设置auto.commit.interval.ms。下面给出一个自动提交的例子,设置没消费5次,提交一次偏移值:
public static void consumeHandleCommit() {
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
try {
int maxcount = 5;
int count = 0;
kafkaConsumer.subscribe(Arrays.asList(topic));
for (; ; ) {
// 拉取消息
ConsumerRecords records = kafkaConsumer.poll(polltimeout);
for (ConsumerRecord record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
count++;
}
// 业务逻辑完成后,提交偏移量
if (count >= maxcount) {
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (null != exception) {
exception.printStackTrace();
} else {
System.out.println("偏移值提交成功");
}
}
});
count = 0;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
多线程消费者
单线程的消费者效率肯定是低于多线程的消费者的,但是消费者的多线程设计与生产者不同,KafkaConsumer是非线程安全的。
保证线程安全,每个线程,各自实例化一个KafkaConsumer对象,并且多个消费者线程只消费同一个主题,不考虑多个消费者线程消费同一个分区。
线程实体类:
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerThread implements Runnable {
//每个线程私有一个consumer实例
private KafkaConsumer<String,String> consumer;
public KafkaConsumerThread(Map<String,Object> configMap,String topic) {
Properties properties = new Properties();
properties.putAll(configMap);
this.consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
for (; ; ) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
线程启动类:
package kafka.consumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExcutor {
/**
* kafkaConsumer是非线程安全的,处理好多线程同步的方案是
* 每个线程实例化一个kafkaConsumer对象
*/
public static void main(String args[]) {
String topic = "hello";
Map<String, Object> configMap = new HashMap<>();
configMap.put("bootstrap.servers", "10.0.90.53:9092");
//group.id指定消费者,所在的组,保证所有线程都在一个消费者组
configMap.put("group.id", "test");
configMap.put("enable.auto.commit", true);
configMap.put("auto.commit.interval.ms", 1000);
// key序列化
configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value序列化
configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ExecutorService service = Executors.newFixedThreadPool(6);
// 该主题总共有6个分区,那么保证6个线程
for (int i = 0; i < 6; i++) {
service.submit(new KafkaConsumerThread(configMap, topic));
}
}
}
总结
本文介绍了kafka中的消费者,包括多线程消费者,偏移量的处理,以及分区再平衡。
切记一点,实际的生产中消费者需要根据实际业务来进行设计。