【kafka】生产者API 回调 同步

普通实现


public class MyProducer {
    public static void main(String[] args) {
        /**
         * 创建Kafka生产者配置信息:ProducerConfig类中记录了Kafka需要的所有参数信息
         * 1.指定连接的Kafka集群
         * 2.ack应答级别
         * 3.发送失败的重试次数
         * 4.批次大小(一次发送多少大小数据)
         * 5.等待时间
         * 6.RecordAccumulator缓冲区大小
         * 7.指定key,value序列化类
         */
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         *  通过配置文件创建生产者对象
         */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
        // 创建记录ProducerRecord("Topic","partition","key","value")
            ProducerRecord<String, String> message =
                    new ProducerRecord<String, String>("test", 0,"MyProducer","hello" + i);
            // send:异步方法,发送之后,立即返回,并不是说调用了,就真的发送成功了;
            kafkaProducer.send(message);
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

同步实现

public class MyProducerFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "hello" + i);
            /**
             * 同步发送,send返回 Future对象
             * 调用get()
             * 返回RecordMetadata元数据记录,记录了发送的topic,partition,offset
             */
            RecordMetadata metadata = kafkaProducer.send(message).get();
            String topic = metadata.topic();
            int partition = metadata.partition();
            long offset = metadata.offset();
            System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

回调实现

public class MyProducerCallback {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "messuesein--" + i);
            /**
             * 发送消息:带回调
             * 传入CallBack函数接口,参数:
             * 1. RecordMetadata:成功返回元数据记录
             * 2. Exception:失败返回异常
             */
            kafkaProducer.send(message, (metadata, exception) -> {
                // exception==null,即成功
                if (exception == null) {
                    /**
                     * metadata记录元数据信息
                     */
                    String topic = metadata.topic();
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
                } else {
                    exception.printStackTrace();
                }
            });
        }
        // 关闭连接:会清空内存
        kafkaProducer.close();
    }
}

【kafka】生产者API 回调 同步

上一篇:Unable to connect to Command Metric Stream 'api/turbine/stream?cluster=REPORTS' Error: {"isTrusted"}


下一篇:C# 将Word转为PDF、XPS、Epub、RTF(基于Spire.Cloud.Word.SDK)