Kafka

 

?命令:

    启动zookeeper:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

   启动kafka:

bin\windows\kafka-server-start.bat config\server.properties

   创建topic:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic test

   创建生产者:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

   创建消费者:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

 

 

 

? 在java中操作:

   1、生产者添加消息:

public class Kafka {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("acks", "all");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new  KafkaProducer<String,String>(kafkaProps);
//        consumer=new KafkaConsumer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("test", "444");
        try {
            System.out.println(producer.send(record).get());
            List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
            partitions = producer.partitionsFor("test");
            for(PartitionInfo p:partitions)
            {
                System.out.println(p);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

   2、消费者读取消息:

public class KafkaRead {
    private static KafkaConsumer<String,String> consumer;
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<>(kafkaProps);
        consumer.subscribe(java.util.Collections.singletonList("test"));
        try {
            while(true) {
                ConsumerRecords<String, String> records=consumer.poll(100);

                for(ConsumerRecord<String, String> record:records) {
                    System.out.println(record.toString());
                }
            }
        }catch (Exception e) {
            // TODO: handle exception
        }
    }
}

 

Kafka

上一篇:HashiCorp 创始人卸任,回归初心再做全职工程师


下一篇:Azure DevOps Server Pipelines需要注意的几个问题