头歌educoder-kafka入门篇

头歌educoder-kafka入门篇

第一关kafka-初体验

头歌educoder-kafka入门篇

#!/bin/bash
#1.创建一个名为demo的Topic
kafka-topics.sh -create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo
#2.查看所有Topic
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
#3.查看名为demo的Topic的详情信息
kafka-topics.sh -topic demo --describe --zookeeper 127.0.0.1:2181

第二关:生产者-简单模式

头歌educoder-kafka入门篇

package net.educoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
 * kafka producer 简单模式
 */
public class App {
    public static void main(String[] args) {
        /**
         * 1.创建配置文件对象,一般采用 Properties
         */
        /**----------------begin-----------------------*/
        Properties props = new Properties();
        /**-----------------end-------------------------*/
        /**
         * 2.设置kafka的一些参数
         *          bootstrap.servers --> kafka的连接地址 127.0.0.1:9092
         *          key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer
         *          acks:1,-1,0
         */
        /**-----------------begin-----------------------*/
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**-----------------end-------------------------*/
        /**
         * 3.构建kafkaProducer对象
         */
        /**-----------------begin-----------------------*/
        Producer<String, String> producer = new KafkaProducer<>(props);
        /**-----------------end-------------------------*/
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("demo", i + "", i + "");
            /**
             * 4.发送消息
             */
            /**-----------------begin-----------------------*/
            producer.send(record);
            /**-----------------end-------------------------*/
        }
        producer.close();
    }
}

第三关:消费者-自动提交偏移量

头歌educoder-kafka入门篇

package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class App {
    public static void main(String[] args) {
        Properties props = new Properties();
        /**--------------begin----------------*/
        //1.设置kafka集群的地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        //2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "g1");
        //3.开启offset自动提交
        props.put("enable.auto.commit", "true");
        //4.自动提交时间间隔
        props.put("auto.commit.interval.ms", "1000");
        //5.序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**---------------end---------------*/
        /**--------------begin----------------*/
        //6.创建kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //7.订阅kafka的topic
        consumer.subscribe(Arrays.asList("demo"));
        /**---------------end---------------*/
        int i = 1;
        while (true) {
            /**----------------------begin--------------------------------*/
            //8.poll消息数据,返回的变量为crs
            ConsumerRecords<String, String> crs = consumer.poll(100);
            for (ConsumerRecord<String, String> cr : crs) {
                System.out.println("consume data:" + i);
                i++;
            }
            /**----------------------end--------------------------------*/
            if (i > 10) {
                return;
            }
        }
    }
}

第四关:消费者-手动提交偏移量

头歌educoder-kafka入门篇

package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class App {
    public static void main(String[] args){
        Properties props = new Properties();
        /**-----------------begin------------------------*/
        //1.设置kafka集群的地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        //2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "g1");
        //3.关闭offset自动提交
        props.put("enable.auto.commit", "false");
        //4.序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**-----------------end------------------------*/
        /**-----------------begin------------------------*/
        //5.实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //6.消费者订阅主题,订阅名为demo的主题
        consumer.subscribe(Arrays.asList("demo"));
        /**-----------------end------------------------*/
        final int minBatchSize = 10;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                for (ConsumerRecord bf : buffer) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
                }
                /**-----------------begin------------------------*/
                //7.手动提交偏移量
                consumer.commitSync();
                /**-----------------end------------------------*/
                buffer.clear();
                return;
            }
        }
    }
}

上一篇:C/C++ -- Gui编程 -- Qt库的使用 -- Qt编码问题


下一篇:Kafka术语简介