Kafka参数

配置

@KafkaListener注解

@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
    public void listen(String msgData) {
    LOGGER.info("收到消息" + msgData);
}  

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen3(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen4(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

(1)id:默认是每个Listener实例的重要标识。

        对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

Kafka参数

(2)goupId:每个消费者所属的组。

        每个消费者都有自己所属的组。一个组中可以有多个消费者。

Kafka参数

        一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA

[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA

[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA

[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

 (3)clientIdPrefix:消费者clientId前缀

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

        如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。我这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

 Kafka参数

@KafkaListener注解方法参数汇总

         共8种方法可供调用。

    public void listen1(String data) 

    public void listen2(ConsumerRecord<K,V> data) 

    public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 

    public void listen4(ConsumerRecord<K,V> data,
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

    public void listen5(List<String> data) 

    public void listen6(List<ConsumerRecord<K,V>> data) 

    public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 

    public void listen8(List<ConsumerRecord<K,V>> data, 
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

上一篇:RESTFul风格的URL


下一篇:使用Spring Cloud Feign作为HTTP客户端调用远程HTTP服务