Kafka消费端过滤数据方案

前言:

kafka一些常用命令:

cd /opt/module/kafka

查看kafka主题:

./kafka-topics.sh --list --zookeeper localhost:2181

 查看主题详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaTopic1

kafka消费端数据过滤方案:

在生产端不做配置,只管按特定主题生产数据。

在消费端,对特定主题数据进行分组获取,然后在获取过程中对符合业务条件的数据进行处理,否则跳过,但还是会告诉kafka我已经消费过了。

示例代码:

生产端:

 public String hello()
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.11.101:9092");
        props.put("acks", "all");
        props.put("retries", "5");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        x++;
        producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value1:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value11:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value2:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value22:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value3:"+x));
        producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value33:"+x));


        return "ok";
    }

消费端:

 @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group1")})
    public void aaaa(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key1"))
        {
            System.out.printf("group1独立获取key1:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }
    @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group2")})
    public void bbbb(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key2"))
        {
            System.out.printf("group2独立获取key2:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }
   @KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group3")})
    public void ccc(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
        String processRecord = (String) record.value();
        String x = (String)record.key();
        if(x.equals("key3"))
        {
            System.out.printf("group3独立获取key3:"+processRecord+"\r\n");

        }
        acknowledgment.acknowledge();
    }

其中,group1和group2在调试环境下运行,group3在打包jar后,在命令行模式下运行,在调用生产端生产数据后,消费端的消费情况如下:

Kafka消费端过滤数据方案

完全符合预期。

上一篇:SpringBoot 启动流程探究


下一篇:Springboot(二)——自动装原理