配置
@KafkaListener注解
@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
public void listen(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
topics = Constants.TOPIC)
public void listen2(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen3(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen4(String msgData) {
LOGGER.info("收到消息" + msgData);
}
(1)id:默认是每个Listener实例的重要标识。
对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。
(2)goupId:每个消费者所属的组。
每个消费者都有自己所属的组。一个组中可以有多个消费者。
一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。
[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA
(3)clientIdPrefix:消费者clientId前缀
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
topics = Constants.TOPIC)
public void listen2(String msgData) {
LOGGER.info("收到消息" + msgData);
}
如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。我这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。
@KafkaListener注解方法参数汇总
共8种方法可供调用。
public void listen1(String data)
public void listen2(ConsumerRecord<K,V> data)
public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
public void listen4(ConsumerRecord<K,V> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)
public void listen5(List<String> data)
public void listen6(List<ConsumerRecord<K,V>> data)
public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
public void listen8(List<ConsumerRecord<K,V>> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)