【kafka】生产者API

自动提交

自动提交:每次消费之后,提交自己的offset


public class MyConsumer {
    public static void main(String[] args) {
        /**
         * 创建消费者的配置信息,并put配置
         * 1.集群ip端口
         * 2.开启自动提交
         * 3. 自动提交延迟
         * 4. key,value序列化类
         * 5. 消费者组id
         */
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "c2");
        /**
         * 创建消费者---KafkaConsumer
         * 订阅主题---subscribe---需要传入一个集合
         * 订阅指定Topic+分区---assign
         * 拉取消息---poll---需要参数timeout,没消息时的等待时间
         */
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
        // 仅订阅分区
        // kafkaConsumer.subscribe(Collections.singleton("test"));
        TopicPartition test = new TopicPartition("test", 0);
        kafkaConsumer.assign(Collections.singleton(test));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String topic = record.topic();
                String key = record.key();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println(
                        "Topic"+"==>"+topic+" partition:"+"==>" + partition + "key:"+ "==>" + key + " offset:" +"==>" + offset);
            }
        }
    }
}

手动提交

两种提交方式:同步提交commitSync,异步提交commitAsync

public class ManualCommit {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "c2");

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
        TopicPartition test = new TopicPartition("test", 0);
        kafkaConsumer.assign(Collections.singleton(test));

        /**
         * 只能消费实时数据
         * 要想实现--from-beginning
         * 需要重置offset
         */
        while (true) {
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String topic = record.topic();
                String key = record.key();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println(
                        "Topic"+"==>"+topic+" partition:"+"==>" + partition + "key:"+ "==>" + key + " offset:" +"==>" + offset);
            }
            /**
             * 同步提交(commitSync):阻塞,提交成功,才会下一次数据拉取
             * 异步提交(commitAsync):提交线程和poll线程,异步
             */
            kafkaConsumer.commitAsync();
        }
    }
}

【kafka】生产者API

上一篇:C# (CLR) 中内存分配解析


下一篇:AcWing 790. 数的三次方根